In [1]:
import os
import torch
import pandas as pd
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModel, AdamW, get_linear_schedule_with_warmup
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Running on device:", device)

Running on device: cpu


In [2]:
from sklearn.model_selection import train_test_split
import pandas as pd

class DataPreparator:
    def __init__(self, model_name: str, max_length: int = 256):
        self.model_name = model_name
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.max_length = max_length

    def split_data(self, data, label_col='label', text_col='text', test_size=0.2, random_state=42):
        """
        Splits data into train and validation sets. Avoids stratification if class counts are too low.
        """
        df = pd.DataFrame(data)
        stratify_col = None

        # Check if stratification is possible
        if df[label_col].value_counts().min() > 1:
            stratify_col = df[label_col]

        train_df, val_df = train_test_split(
            df, 
            test_size=test_size, 
            random_state=random_state, 
            stratify=stratify_col
        )
        return train_df, val_df
    def tokenize(self, texts):
        """
        Applies the BERT tokenizer to a list of texts.
        Returns a dictionary with 'input_ids' and 'attention_mask'.
        """
        return self.tokenizer(
            texts,
            padding=True,
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )

In [3]:
test_data = [
    {"text": "Doc 1 about technology", "label": "Technology"},
    {"text": "Doc 2 about finance", "label": "Financial"},
    {"text": "Doc 3 about technology again", "label": "Technology"},
    {"text": "Doc 4 about marketing", "label": "Marketing"},
    {"text": "Doc 5 about marketing again", "label": "Marketing"}  # Additional sample
]

prep = DataPreparator(model_name="nlpaueb/legal-bert-base-uncased", max_length=32)
train_df, val_df = prep.split_data(test_data, label_col='label')

print("Train DataFrame:")
print(train_df)
print("\nValidation DataFrame:")
print(val_df)

tokenized = prep.tokenize(train_df['text'].tolist())
print("\nKeys in tokenized output:", tokenized.keys())
print("Shape of input_ids:", tokenized['input_ids'].shape)

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.02k [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/222k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

Train DataFrame:
                           text       label
4   Doc 5 about marketing again   Marketing
2  Doc 3 about technology again  Technology
0        Doc 1 about technology  Technology
3         Doc 4 about marketing   Marketing

Validation DataFrame:
                  text      label
1  Doc 2 about finance  Financial

Keys in tokenized output: dict_keys(['input_ids', 'token_type_ids', 'attention_mask'])
Shape of input_ids: torch.Size([4, 7])


In [4]:
class BertClassifier(nn.Module):
    """
    A simple classification model on top of a BERT base.
    """
    def __init__(self, model_name: str, num_labels: int):
        """
        :param model_name: The name of the pretrained model (FinBERT, LegalBERT, etc.).
        :param num_labels: The number of possible labels/classes.
        """
        super().__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(p=0.3)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask=None, labels=None):
        """
        Forward pass of the model.
        """
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output  # shape: (batch_size, hidden_size)
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        output_dict = {"logits": logits}

        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
            output_dict["loss"] = loss

        return output_dict

In [5]:
# We test a simple forward pass to ensure it works
model = BertClassifier(model_name="nlpaueb/legal-bert-base-uncased", num_labels=3)

# Dummy input
batch_size = 2
seq_length = 8
dummy_input_ids = torch.randint(0, 1000, (batch_size, seq_length))
dummy_attention_mask = torch.ones((batch_size, seq_length))
dummy_labels = torch.tensor([0, 1])  # some label IDs

outputs = model(dummy_input_ids, attention_mask=dummy_attention_mask, labels=dummy_labels)
print("Output keys:", outputs.keys())
print("Logits shape:", outputs["logits"].shape)
print("Loss shape:", outputs["loss"].shape)

pytorch_model.bin:   0%|          | 0.00/440M [00:00<?, ?B/s]

Output keys: dict_keys(['logits', 'loss'])
Logits shape: torch.Size([2, 3])
Loss shape: torch.Size([])


In [6]:
class DocumentDataset(Dataset):
    """
    PyTorch Dataset for our documents.
    Expects 'input_ids', 'attention_mask', and integer 'labels'.
    """
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)

def train_model(model, train_dataset, val_dataset, epochs=2, batch_size=4, lr=1e-5):
    """
    Custom training loop for the classification model.
    """
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    optimizer = AdamW(model.parameters(), lr=lr)
    total_steps = len(train_loader) * epochs
    scheduler = get_linear_schedule_with_warmup(optimizer, 
                                                num_warmup_steps=0,
                                                num_training_steps=total_steps)

    model.to(device)

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask, labels=labels)
            loss = outputs["loss"]
            loss.backward()
            optimizer.step()
            scheduler.step()

            total_loss += loss.item()

        avg_train_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}, Training loss: {avg_train_loss:.4f}")

        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device)

                outputs = model(input_ids, attention_mask, labels=labels)
                val_loss += outputs["loss"].item()

                logits = outputs["logits"]
                preds = torch.argmax(logits, dim=1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)

        avg_val_loss = val_loss / len(val_loader)
        accuracy = correct / total
        print(f"Validation loss: {avg_val_loss:.4f}, Validation Accuracy: {accuracy:.4f}")

    return model

In [9]:
import os
from collections import defaultdict

# List of folders to iterate through
folders = [
    "data/Loans",
    "data/Non-Disclosure Agreements (NDA)",
    "data/Employment",
    "data/Partnerships"
]

# The label is derived from the folder names
file_to_label = {}
for folder in folders:
    label = os.path.basename(folder)  # Use folder name as label
    for filename in os.listdir(folder):
        file_to_label[os.path.join(folder, filename)] = label

# List to store data entries
data_entries = []

# Process each file in the mapped file_to_label
for file_path, label in file_to_label.items():
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            text_content = f.read()
        data_entries.append({"text": text_content, "label": label})
    except Exception as e:
        print(f"Error reading {file_path}: {e}")

# ----------------------------------------------------
# Now we have data_entries = [{ "text": "...", "label": "..." }, ...]
# Build a label-to-ID map from the unique labels
unique_labels = sorted(list(set(file_to_label.values())))
label2id = {label: idx for idx, label in enumerate(unique_labels)}
id2label = {v: k for k, v in label2id.items()}

print("Unique labels found:", unique_labels)
print("Label2ID map:", label2id)

# ----------------------------------------------------
# Prepare data, split, tokenize, create Datasets
model_name = "nlpaueb/legal-bert-base-uncased"
prep = DataPreparator(model_name=model_name, max_length=128)

train_df, val_df = prep.split_data(data_entries, label_col="label", test_size=0.2)
train_labels = train_df["label"].map(label2id).tolist()
val_labels = val_df["label"].map(label2id).tolist()

train_encodings = prep.tokenize(train_df["text"].tolist())
val_encodings = prep.tokenize(val_df["text"].tolist())

train_dataset = DocumentDataset(train_encodings, train_labels)
val_dataset = DocumentDataset(val_encodings, val_labels)

# ----------------------------------------------------
# Create and train the model
num_labels = len(unique_labels)
model = BertClassifier(model_name, num_labels=num_labels)

trained_model = train_model(
    model, 
    train_dataset, 
    val_dataset, 
    epochs=2, 
    batch_size=2, 
    lr=2e-5
)

# Save the fine-tuned model (optional)
torch.save(trained_model.state_dict(), "finbert_classifier.pt")
print("Model training complete and saved.")

Error reading data/Non-Disclosure Agreements (NDA)/.DS_Store: 'utf-8' codec can't decode byte 0x80 in position 3131: invalid start byte
Error reading data/Partnerships/.DS_Store: 'utf-8' codec can't decode byte 0x80 in position 3131: invalid start byte
Unique labels found: ['Employment', 'Loans', 'Non-Disclosure Agreements (NDA)', 'Partnerships']
Label2ID map: {'Employment': 0, 'Loans': 1, 'Non-Disclosure Agreements (NDA)': 2, 'Partnerships': 3}


Epoch 1/2: 100%|████████████████████████████████████| 5/5 [00:02<00:00,  1.70it/s]


Epoch 1, Training loss: 1.5561
Validation loss: 1.2840, Validation Accuracy: 0.6667


Epoch 2/2: 100%|████████████████████████████████████| 5/5 [00:02<00:00,  1.71it/s]


Epoch 2, Training loss: 1.2940
Validation loss: 1.2620, Validation Accuracy: 0.6667
Model training complete and saved.


In [10]:
def predict_topic(trained_model, text, tokenizer, label2id_dict):
    """
    Predict the topic for a single text using the trained model.
    Returns the predicted label string.
    """
    trained_model.eval()
    inputs = tokenizer([text], padding=True, truncation=True, max_length=128, return_tensors='pt')
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    with torch.no_grad():
        outputs = trained_model(input_ids, attention_mask=attention_mask)
        logits = outputs['logits']
        preds = torch.argmax(logits, dim=1).cpu().item()

    # Reverse lookup: find label string from predicted ID
    id2label_local = {v: k for k, v in label2id_dict.items()}
    return id2label_local[preds]


# Example usage with some unlabeled text file
unlabeled_file = "data/k-Non-Project-AC-1st-party-LOAN-AGREEMENT-LL-CL-DCL_Jan24_cleaned.txt"
with open(unlabeled_file, "r", encoding="utf-8") as f:
    text_content = f.read()

predicted_label = predict_topic(trained_model, text_content, prep.tokenizer, label2id)
print(f"Predicted label for '{unlabeled_file}': {predicted_label}")

Predicted label for 'data/k-Non-Project-AC-1st-party-LOAN-AGREEMENT-LL-CL-DCL_Jan24_cleaned.txt': Employment


In [11]:
predicted = predict_topic(trained_model, text_content, prep.tokenizer, label2id)
print("Predicted topic:", predicted)
# Compare 'predicted' with your known label

Predicted topic: Employment
