In [1]:
# Cell 1: Import Libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from transformers import RobertaTokenizer, RobertaForSequenceClassification
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
from torch.optim import Adam


Load and Preprocess Data

In [2]:
# Cell 2: Load Data
df = pd.read_excel('/Users/PremGanesh/Developer/AI/CyVidia/Input_Data/Training Dataset.xlsx')

# Preprocess Data
df['Requirement Description'] = df['Requirement Description'].astype(str).apply(lambda x: x.strip())

# Encode Labels
area_encoder = LabelEncoder()
bucket_encoder = LabelEncoder()

df['Area_Encoded'] = area_encoder.fit_transform(df['Requirement Area (NIST)'])
df['Bucket_Encoded'] = bucket_encoder.fit_transform(df['Requirement Bucket(NIST)'])

# Split Data
train_texts, val_texts, train_labels, val_labels = train_test_split(
    df['Requirement Description'], df[['Area_Encoded', 'Bucket_Encoded']], 
    test_size=0.2
)


Tokenize and Prepare Dataset

In [3]:
# Cell 3: Tokenize and Prepare Dataset
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

class RequirementDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        if idx >= len(self.texts):
            raise IndexError("Index out of bounds in the dataset.")
        text = str(self.texts.iloc[idx])
        labels = self.labels.iloc[idx].tolist()

        encoding = self.tokenizer.encode_plus(
          text,
          add_special_tokens=True,
          max_length=self.max_len,
          return_token_type_ids=False,
          padding='max_length',
          return_attention_mask=True,
          return_tensors='pt',
          truncation=True
        )

        return {
          'text': text,
          'input_ids': encoding['input_ids'].flatten(),
          'attention_mask': encoding['attention_mask'].flatten(),
          'labels': torch.tensor(labels, dtype=torch.long)
        }

train_dataset = RequirementDataset(train_texts, train_labels, tokenizer)
val_dataset = RequirementDataset(val_texts, val_labels, tokenizer)

assert len(train_dataset) == len(train_texts)
assert len(val_dataset) == len(val_texts)


Define the Model

In [4]:
# # Cell 4: Define the Model
# class RequirementModel(nn.Module):
#     def __init__(self, num_area_labels, num_bucket_labels):
#         super(RequirementModel, self).__init__()
#         self.roberta = RobertaForSequenceClassification.from_pretrained('roberta-base',return_dict=True,  num_labels=num_area_labels)
#         self.area_classifier = nn.Linear(self.roberta.config.hidden_size, num_area_labels)
#         self.bucket_classifier = nn.Linear(self.roberta.config.hidden_size, num_bucket_labels)

#     def forward(self, input_ids, attention_mask):
#         outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
#         pooled_output = outputs[1]
#         sequence_output = outputs.last_hidden_state[:, 0, :]
#         area_logits = self.area_classifier(sequence_output)
#         bucket_logits = self.bucket_classifier(sequence_output)
#         return area_logits, bucket_logits

# model = RequirementModel(len(area_encoder.classes_), len(bucket_encoder.classes_))
# model.to(device)

class RequirementModel(nn.Module):
    def __init__(self, num_area_labels, num_bucket_labels):
        super(RequirementModel, self).__init__()
        self.roberta = RobertaForSequenceClassification.from_pretrained('roberta-base')
        self.area_classifier = nn.Linear(self.roberta.config.hidden_size, num_area_labels)
        self.bucket_classifier = nn.Linear(self.roberta.config.hidden_size, num_bucket_labels)

    def forward(self, input_ids, attention_mask):
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        # pooled_output = outputs.pooler_output  # Use pooler_output for classification tasks
        area_logits = outputs.logits
        # area_logits = self.area_classifier(pooled_output)
        # Extract the last hidden state for bucket classification
        last_hidden_state = outputs.hidden_states[-1][:, 0]
        bucket_logits = self.bucket_classifier(last_hidden_state)
        # bucket_logits = self.bucket_classifier(pooled_output)
        last_hidden_state = outputs.hidden_states[-1][:, 0]

        return area_logits, bucket_logits

model = RequirementModel(len(area_encoder.classes_), len(bucket_encoder.classes_))
# model.to(device)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.weight', 'classifier.dense.bias', 'classifier.out_proj.weight', 'classifier.out_proj.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [5]:
# Cell 5: Setup for Training and Evaluation
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

train_data_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_data_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

optimizer = Adam(model.parameters(), lr=2e-5)


In [6]:
# Cell 9: Implement train_epoch Function
def train_epoch(model, data_loader, optimizer, device, n_examples):
    model.train()
    losses = []
    correct_predictions = 0

    for d in data_loader:
        input_ids = d["input_ids"].to(device)
        attention_mask = d["attention_mask"].to(device)
        labels = d["labels"].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        area_logits, bucket_logits = outputs

        area_loss = nn.CrossEntropyLoss()(area_logits, labels[:, 0])
        bucket_loss = nn.CrossEntropyLoss()(bucket_logits, labels[:, 1])
        total_loss = area_loss + bucket_loss
        losses.append(total_loss.item())

        _, area_preds = torch.max(area_logits, dim=1)
        _, bucket_preds = torch.max(bucket_logits, dim=1)

        correct_predictions += torch.sum(area_preds == labels[:, 0]) + torch.sum(bucket_preds == labels[:, 1])

        total_loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        optimizer.zero_grad()

    return correct_predictions.double() / (2 * n_examples), np.mean(losses)

# Cell 10: Implement eval_model Function
def eval_model(model, data_loader, device, n_examples):
    model.eval()
    losses = []
    correct_predictions = 0

    with torch.no_grad():
        for d in data_loader:
            input_ids = d["input_ids"].to(device)
            attention_mask = d["attention_mask"].to(device)
            labels = d["labels"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            area_logits, bucket_logits = outputs

            area_loss = nn.CrossEntropyLoss()(area_logits, labels[:, 0])
            bucket_loss = nn.CrossEntropyLoss()(bucket_logits, labels[:, 1])
            total_loss = area_loss + bucket_loss
            losses.append(total_loss.item())

            _, area_preds = torch.max(area_logits, dim=1)
            _, bucket_preds = torch.max(bucket_logits, dim=1)

            correct_predictions += torch.sum(area_preds == labels[:, 0]) + torch.sum(bucket_preds == labels[:, 1])

    return correct_predictions.double() / (2 * n_examples), np.mean(losses)


In [7]:
# Cell 6: Training Loop


num_epochs = 3  # Number of epochs

for epoch in range(num_epochs):
    print(f'Epoch {epoch + 1}/{num_epochs}')
    print('-' * 10)

    train_acc, train_loss = train_epoch(
        model,
        train_data_loader, 
        optimizer, 
        device, 
        len(train_dataset)
    )
    print(f'Train loss {train_loss} accuracy {train_acc}')

    val_acc, val_loss = eval_model(
        model,
        val_data_loader, 
        device, 
        len(val_dataset)
    )
    print(f'Validation loss {val_loss} accuracy {val_acc}')


Epoch 1/3
----------


TypeError: 'NoneType' object is not subscriptable

In [None]:
# Cell 7: Training Loop
num_epochs = 3  # Number of epochs

for epoch in range(num_epochs):
    print(f'Epoch {epoch + 1}/{num_epochs}')
    print('-' * 10)

    train_acc, train_loss = train_epoch(
        model,
        train_data_loader, 
        optimizer, 
        device, 
        len(train_dataset)
    )
    print(f'Train loss {train_loss} accuracy {train_acc}')

    val_acc, val_loss = eval_model(
        model,
        val_data_loader, 
        device, 
        len(val_dataset)
    )
    print(f'Validation loss {val_loss} accuracy {val_acc}')
