### Imports

In [13]:
import math
import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

from tqdm import tqdm

import re
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace

### Data Loading

#### Combines all text descriptions into one csv

In [14]:
# import csv
# import os

# directory = "./text_c10/"

# header = ['Filename', 'ID', 'Class', 'Content']
# textfile = open('descriptions.txt', 'w')

# # Open the CSV file for writing
# with open('image_text_descriptions.csv', 'w', newline='') as csvfile:
#     # Initialize the CSV writer and write the header row
#     writer = csv.writer(csvfile)
#     writer.writerow(header)

#     folders = os.listdir(directory)
#     folders.sort()

#     class_val = 0
#     id = 0
#     for folder in folders:
#         files = os.listdir(directory + folder)
#         files.sort()
#         class_val += 1
    
#         # Loop through each file in the directory
        
#         for filename in files:
#             id += 1
#             # Check if the file is a text file
#             if filename.endswith('.txt'):
#                 # Open the text file and read its contents
#                 with open(directory + folder + "/" + filename, 'r') as file:
#                     content = file.read()
                
#                 # Write the filename and content to a new row in the CSV file
#                 writer.writerow([filename, id, class_val, content])
#                 textfile.write(content)

In [15]:
image_text_labels = pd.read_csv("image_text_descriptions.csv")
train_test_split = pd.read_csv("train_test_split.txt", header=None, sep=" ")

# Splits train and test data
text_df = pd.DataFrame(image_text_labels)
tt_df = pd.DataFrame(train_test_split)
train_df = tt_df[tt_df[1] == 1]
test_df = tt_df[tt_df[1] == 0]
text_train_df = text_df[text_df["ID"].isin(train_df[0])]
text_test_df = text_df[text_df["ID"].isin(test_df[0])]

In [16]:
def preprocess(txt):
    return re.sub(r' +', ' [SPACE] ', txt).replace('\n', ' [NEWLINE] ')

In [17]:
# Same as Homework 4, except vocab_size is larger 
with open('descriptions.txt', 'r', encoding='utf-8') as f:
    text = f.read()

tokenizer = Tokenizer(BPE(unk_token="[UNK]"))
trainer = BpeTrainer(special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]", "[SPACE]", "[NEWLINE]"], vocab_size=32000)
tokenizer.pre_tokenizer = Whitespace()
tokenizer.train_from_iterator([text], trainer=trainer)
tokenizer.save("tokenizer.json")
VOCAB_SIZE = tokenizer.get_vocab_size()

encode = lambda s: tokenizer.encode(preprocess(s)).ids
decode = lambda l: tokenizer.decode(l, skip_special_tokens=False).replace(' ', '').replace('[SPACE]', ' ').replace('[NEWLINE]', '\n')






In [18]:
class TextDataSet(Dataset):
    def __init__(self, df, tokenizer):
        self.df = df
        self.tokenizer = tokenizer
        self.max_seq_length = 280

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        text = self.df.iloc[idx]["Content"]

        # label needs to be minus 1 as nn.CrossEntropyLoss takes in 0-199
        label = self.df.iloc[idx]["Class"] - 1

        # padding to keep the seq_len of each encoding the same, uses [PAD] token
        encoding = tokenizer.encode(text).ids
        pad = tokenizer.token_to_id("[PAD]")
        encoding += [pad] * (self.max_seq_length - len(encoding))
        encoding = torch.tensor(encoding)
        return encoding, label

In [19]:
trn_dataset = TextDataSet(text_train_df, tokenizer)
tst_dataset = TextDataSet(text_test_df, tokenizer)

trn_loader = DataLoader(trn_dataset, batch_size=32, shuffle=True)
tst_loader = DataLoader(tst_dataset, batch_size=32, shuffle=False)

### Model

In [20]:
# Standard Positional Encoding uses sin and cos for pos_encode
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [21]:
class TextModel(nn.Module):
    def __init__(self, vocab_size, num_classes, embed_dim, hidden_dim, n_layers, num_heads, dropout):
        super(TextModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.pos_encoder = PositionalEncoding(embed_dim, dropout)
        # Using Transformer architecture provided by PyTorch
        encoder_layer = nn.TransformerEncoderLayer(embed_dim, num_heads, hidden_dim, dropout)
        self.encoder = nn.TransformerEncoder(encoder_layer, n_layers)
        self.fc = nn.Linear(embed_dim, num_classes)

    def forward(self, x):
        # x size is (batch_size, seq_len, embed_dim)
        x = self.embedding(x)
        x = self.pos_encoder(x)
        # Transformer Encoder takes (seq_len, batch_size, embed_dim)
        x = x.permute(1, 0, 2)
        x = self.encoder(x)
        x = x.mean(dim=0)
        x = self.fc(x)
        return x

### Training

In [22]:
NUM_CLASSES = 200
EMB_DIM = 64
HIDDEN_DIM = EMB_DIM * 4
N_LAYERS = 6
NUM_HEADS = 8
DROPOUT = 0

net = TextModel(VOCAB_SIZE, NUM_CLASSES, EMB_DIM, HIDDEN_DIM, N_LAYERS, NUM_HEADS, DROPOUT)

In [23]:
# Training parameters
LR = 5e-4
NUM_EPOCHS = 25

optim = torch.optim.AdamW(net.parameters(), lr=LR, weight_decay=0.01)
loss_fn = nn.CrossEntropyLoss()

In [24]:
global_step = 0
for epoch in range(NUM_EPOCHS):
    t = tqdm(trn_loader, desc='Epoch: ?, Loss: ?')
    running_loss = 0.0
    for i, (encodings, labels) in enumerate(t):
        net.train()
        logits = net(encodings)
        loss = loss_fn(logits, labels)
        loss.backward()
        optim.step()
        optim.zero_grad()
        running_loss += loss.item()
        global_step += 1
        t.set_description(f'Epoch: {epoch}/{NUM_EPOCHS}, Loss: {running_loss/(i+1):.3f}')

Epoch: 0/25, Loss: 5.371: 100%|██████████| 188/188 [01:41<00:00,  1.86it/s]
Epoch: 1/25, Loss: 5.217: 100%|██████████| 188/188 [01:40<00:00,  1.87it/s]
Epoch: 2/25, Loss: 4.660: 100%|██████████| 188/188 [01:39<00:00,  1.88it/s]
Epoch: 3/25, Loss: 4.095: 100%|██████████| 188/188 [01:40<00:00,  1.88it/s]
Epoch: 4/25, Loss: 3.701: 100%|██████████| 188/188 [01:37<00:00,  1.92it/s]
Epoch: 5/25, Loss: 3.388: 100%|██████████| 188/188 [01:37<00:00,  1.93it/s]
Epoch: 6/25, Loss: 3.126: 100%|██████████| 188/188 [01:36<00:00,  1.95it/s]
Epoch: 7/25, Loss: 2.925: 100%|██████████| 188/188 [01:35<00:00,  1.97it/s]
Epoch: 8/25, Loss: 2.733: 100%|██████████| 188/188 [01:36<00:00,  1.96it/s]
Epoch: 9/25, Loss: 2.560: 100%|██████████| 188/188 [01:35<00:00,  1.97it/s]
Epoch: 10/25, Loss: 2.427: 100%|██████████| 188/188 [01:35<00:00,  1.98it/s]
Epoch: 11/25, Loss: 2.272: 100%|██████████| 188/188 [01:34<00:00,  1.98it/s]
Epoch: 12/25, Loss: 2.115: 100%|██████████| 188/188 [01:35<00:00,  1.97it/s]
Epoch: 13

### Evaluation

In [31]:
net.eval()
with torch.no_grad():
    val_loss = 0.0
    total = 0
    correct = 0
    for (encodings, labels) in tqdm(tst_loader):
        logits = net(encodings)
        loss = loss_fn(logits, labels)
        val_loss += loss.item()
        _, predicted = torch.max(logits.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    print(f'Validation loss: {val_loss/len(tst_loader):.3f}')
print(f'Accuracy: {correct/total:.3f}')

100%|██████████| 182/182 [00:43<00:00,  4.14it/s]

Validation loss: 2.293
Accuracy: 0.441





### Save and Load

In [26]:
torch.save(net.state_dict(), "text_model_3.pth")

In [30]:
# net.load_state_dict(torch.load("text_model_no_dropout.pth"))
# net.eval()

TextModel(
  (embedding): Embedding(7740, 64)
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0, inplace=False)
  )
  (encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-5): 6 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
        )
        (linear1): Linear(in_features=64, out_features=256, bias=True)
        (dropout): Dropout(p=0, inplace=False)
        (linear2): Linear(in_features=256, out_features=64, bias=True)
        (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0, inplace=False)
        (dropout2): Dropout(p=0, inplace=False)
      )
    )
  )
  (fc): Linear(in_features=64, out_features=200, bias=True)
)