In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Device:",device)

from tqdm.notebook import tqdm

Device: cuda


In [2]:
!pip install torchsummaryX

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchsummaryX
  Downloading torchsummaryX-1.3.0-py3-none-any.whl (3.6 kB)
Installing collected packages: torchsummaryX
Successfully installed torchsummaryX-1.3.0


In [3]:
from torchsummaryX import summary

In [55]:
class AdomainDataset(Dataset):
    def __init__(self, filename, padding_token='X', missing_token='-'):
        sequences, labels = self.load_file(filename)
        self.sequences = sequences
        self.labels = labels
        self.padding_token = padding_token
        self.missing_token = missing_token
        self.label_map = self.build_label_map(labels)

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, index):
        sequence = self.sequences[index]
        label = self.labels[index]
        attention_mask = [1 if token != self.padding_token else 0 for token in sequence]
        return sequence, attention_mask, label

    def load_file(self, filename):
        sequences = []
        labels = []
        with open(filename, 'r') as file:
            for line in file:
                sequence, label = line.strip().split('\t')
                sequences.append(sequence)
                labels.append(label)
        return sequences, labels

    def collate_fn(self, batch):
      sequences, attention_masks, labels = zip(*batch)

    # Convert sequences to numerical representation (one-hot encoding)
      char_to_index = {'A': 0, 'C': 1, 'D': 2, 'E': 3, 'F': 4, 'G': 5, 'H': 6, 'I': 7, 'K': 8, 'L': 9, 'M': 10, 'N': 11,
                     'P': 12, 'Q': 13, 'R': 14, 'S': 15, 'T': 16, 'V': 17, 'W': 18, 'Y': 19, '-': 20, 'X': 21}

      numerical_labels = [self.label_map[label] for label in labels]

    # Find the maximum sequence length in the batch
      max_length = max(len(seq) for seq in sequences)

    # Pad sequences and attention masks to the maximum length
      padded_sequences = []
      padded_attention_masks = []
      for sequence, attention_mask in zip(sequences, attention_masks):
        padding_length = max_length - len(sequence)
        padded_sequence = [char_to_index[char] if char in char_to_index else char_to_index[self.missing_token] for char in sequence]
        padded_sequence += [char_to_index[self.padding_token]] * padding_length
        padded_sequences.append(padded_sequence)

        padded_attention_mask = attention_mask + [0] * padding_length
        padded_attention_masks.append(padded_attention_mask)

      numerical_labels = torch.tensor(numerical_labels)
      padded_sequences = torch.tensor(padded_sequences)
      padded_attention_masks = torch.tensor(padded_attention_masks)

      return padded_sequences, padded_attention_masks, numerical_labels




    def build_label_map(self, labels):
      unique_labels = sorted(set(labels))
      label_map = {label: index for index, label in enumerate(unique_labels)}
      return label_map  

In [56]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

# Define the file path for the dataset
filename = '/content/a_domains.tsv'

# Create an instance of the AdomainDataset
dataset = AdomainDataset(filename)

## Split the dataset into train, val, and test sets
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
    dataset, [train_size, val_size, test_size]
)


train_loader = torch.utils.data.DataLoader(
    dataset     = train_dataset,
    collate_fn= dataset.collate_fn, 
    num_workers = 4,
    batch_size  = 32, 
    pin_memory  = True,
    shuffle     = True
)

val_loader = torch.utils.data.DataLoader(
    dataset     = val_dataset, 
    collate_fn = dataset.collate_fn,
    num_workers = 2,
    batch_size  = 32,
    pin_memory  = True,
    shuffle     = False
)

test_loader = torch.utils.data.DataLoader(
    dataset     = test_dataset,
    collate_fn =  dataset.collate_fn,
    num_workers = 2, 
    batch_size  = 32, 
    pin_memory  = True, 
    shuffle     = False
)


In [57]:
print("Train Data:")
for batch_idx, (sequences, attention_masks, labels) in enumerate(train_loader):
    print("Batch Sequences:", sequences.shape)
    print("Batch Attention Masks:", attention_masks.shape)
    print("Batch Labels:", labels.shape)

    # Access the first sequence, attention mask, and label
    first_sequence = sequences[0]
    first_attention_mask = attention_masks[0]
    first_label = labels[0]

    print("First Sequence:", first_sequence)
    print("First Attention Mask:", first_attention_mask)
    print("First Label:", first_label)
    break  # Print only the first batch


Train Data:
Batch Sequences: torch.Size([32, 34])
Batch Attention Masks: torch.Size([32, 34])
Batch Labels: torch.Size([32])
First Sequence: tensor([ 9,  4, 16, 16,  4,  2, 17,  1, 19, 13,  3, 15, 15,  9,  7, 16,  0,  5,
         3,  6, 11,  6, 19,  5, 12, 15,  3, 16,  6, 17, 17, 16, 16,  1])
First Attention Mask: tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
First Label: tensor(31)


In [58]:
! pip install transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [59]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import BertModel, BertConfig

class TransformerModel(nn.Module):
    def __init__(self, num_classes):
        super(TransformerModel, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.dropout = nn.Dropout(0.1)
        self.fc = nn.Linear(self.bert.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        pooled_output = self.dropout(pooled_output)
        logits = self.fc(pooled_output)
        return logits


In [62]:
num_classes = len(dataset.label_map)
model = TransformerModel(num_classes= 37).to(device) # Create an instance of the model
print(model)  # Print the model architecture

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


TransformerModel(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_

In [67]:
criterion = nn.CrossEntropyLoss()
batch_size = 32

# Define the optimizer
learning_rate = 0.1
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)

# Function for training the model
def train(model, dataloader, criterion, optimizer):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for inputs, attention_mask, labels in dataloader:
        optimizer.zero_grad()
        #print(inputs.shape)
        inputs = inputs.to(device)
        attention_mask = attention_mask.to(device)
        #inputs = inputs.view(batch_size, -1)
        
        labels = labels.to(device)
        #print(labels.shape)
        outputs = model(inputs, attention_mask)
        #print(outputs.shape)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        _, predicted = torch.max(outputs.data, 1)
        total_predictions += labels.size(0)
        correct_predictions += (predicted == labels).sum().item()

    train_loss = running_loss / len(dataloader)
    train_accuracy = correct_predictions / total_predictions

    return train_loss, train_accuracy

# Function for validating the model
def validate(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():
        for inputs, attention_mask, labels in dataloader:
            inputs = inputs.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs, attention_mask)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total_predictions += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()

    val_loss = running_loss / len(dataloader)
    val_accuracy = correct_predictions / total_predictions

    return val_loss, val_accuracy

# Training loop
num_epochs = 500

for epoch in range(num_epochs):
    train_loss, train_accuracy = train(model, train_loader, criterion, optimizer)
    val_loss, val_accuracy = validate(model, val_loader, criterion)

    print(f"Epoch {epoch+1}/{num_epochs}:")
    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")
    print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")



Epoch 1/500:
Train Loss: 218.9859, Train Accuracy: 0.0419
Validation Loss: 233.6882, Validation Accuracy: 0.0000
Epoch 2/500:
Train Loss: 236.5271, Train Accuracy: 0.0873
Validation Loss: 176.1243, Validation Accuracy: 0.0704
Epoch 3/500:
Train Loss: 142.8055, Train Accuracy: 0.0663
Validation Loss: 123.1257, Validation Accuracy: 0.0000
Epoch 4/500:
Train Loss: 110.5212, Train Accuracy: 0.0436
Validation Loss: 113.1409, Validation Accuracy: 0.1268
Epoch 5/500:
Train Loss: 88.2290, Train Accuracy: 0.0698
Validation Loss: 82.9312, Validation Accuracy: 0.0563
Epoch 6/500:
Train Loss: 86.0234, Train Accuracy: 0.0785
Validation Loss: 77.1550, Validation Accuracy: 0.0563
Epoch 7/500:
Train Loss: 88.0406, Train Accuracy: 0.0471
Validation Loss: 98.0718, Validation Accuracy: 0.0845
Epoch 8/500:
Train Loss: 94.6070, Train Accuracy: 0.0698
Validation Loss: 95.0852, Validation Accuracy: 0.0845
Epoch 9/500:
Train Loss: 92.6719, Train Accuracy: 0.0558
Validation Loss: 99.4311, Validation Accuracy: 

In [68]:
def test(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    predictions = []

    with torch.no_grad():
        for batch in dataloader:
            inputs, attention_masks, labels = batch
            inputs = inputs.to(device)
            attention_masks = attention_masks.to(device)
            labels = labels.to(device)

            outputs = model(inputs, attention_masks)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total_predictions += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()

            # Append the predicted labels to the predictions list
            predictions.extend(predicted.tolist())

    test_loss = running_loss / len(dataloader)
    test_accuracy = correct_predictions / total_predictions

    return test_loss, test_accuracy, predictions

test_loss, test_accuracy, predictions = test(model, test_loader, criterion)

print("Test Results:")
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

print("Predictions:")
print(predictions[:batch_size])

Test Results:
Test Loss: 78.9555, Test Accuracy: 0.1644
Predictions:
[29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29]
