# Mohammad Amin Rami
# 98101588
# HW4
# Question 3


In [1]:
from transformers import AutoConfig, AutoTokenizer, AutoModel, BertTokenizer
from torch.utils.data import Dataset, DataLoader
from torch import nn
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import torch
import numpy as np
import os
import random

device = 'cuda' if torch.cuda.is_available() else 'cpu'

config = AutoConfig.from_pretrained("HooshvareLab/bert-fa-base-uncased")
tokenizer = AutoTokenizer.from_pretrained("HooshvareLab/bert-fa-base-uncased")


In [2]:
list_of_poets = [
    'amir_norm.txt',
    'anvari_norm.txt',
    'bahar_norm.txt',
    'bidel_norm.txt',
    'ferdousi_norm.txt',
    'khaghani_norm.txt',
    'salman_norm.txt',
    'shahnematollah_norm.txt',
    'moulavi_norm.txt',
    'khosro_norm.txt'
]
poet_names = [name.split('_')[0] for name in list_of_poets]
data_dir = 'normalized'

def create_dataset(data_dir, list_of_poets, max_beyts):
    data = []
    for text_path in list_of_poets:
        data_path = os.path.join(data_dir, text_path)
        poet_name = text_path.split('_')[0]
        count = 0
        with open(data_path) as file:
            for line in file:
                if line.strip() == '':
                    continue
                data.append((line.strip(), poet_name))
                count += 1
                if count >= max_beyts:
                    break
    return data

max_beyts = 10000
data = create_dataset(data_dir, list_of_poets, max_beyts)
random.shuffle(data)


In [3]:
ratio = 0.8
k = 1/3
train_size = int(ratio * len(data))
train_data = data[:train_size]
test_data = data[train_size:]


In [4]:
labels = {name: i for i, name in enumerate(poet_names)}

class Poem(Dataset):
    def __init__(self, data, tokenizer):
        self.labels_map = labels
        self.tokenizer = tokenizer
        self.texts = [sample[0] for sample in data]
        self.labels = [self.labels_map[sample[1]] for sample in data]
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, index):
        text = self.tokenizer(self.texts[index], padding='max_length', max_length = 25, truncation=True, return_tensors="pt")
        label = self.labels[index]
        return text, label


### Part A

In [5]:
class BertClassifier(nn.Module):
    def __init__(self, train_bert):
        super().__init__()
        self.bert = AutoModel.from_pretrained("HooshvareLab/bert-fa-base-uncased").to(device)
        self.train_bert = train_bert
        if not train_bert:
            for param in self.bert.parameters():
                param.requires_grad = False
        self.linear = nn.Sequential(
            nn.Linear(768, 384),
            nn.ReLU(),
            nn.Linear(384, 128),
            nn.ReLU(),
            nn.Linear(128, 10),
        ).to(device)
        self.relu = nn.ReLU().to(device)
    
    def forward(self, input_id, mask):
        _, pooled_output = self.bert(input_ids= input_id, attention_mask=mask,return_dict=False)
        linear_output = self.linear(pooled_output)
        final_layer = self.relu(linear_output)
        return final_layer


In [6]:
from torch.optim import Adam
from tqdm import tqdm

def train(model, train_data, test_data, tokenizer, learning_rate, batch_size, epochs):

    train, test = Poem(train_data, tokenizer), Poem(test_data, tokenizer)

    train_dataloader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True)
    test_dataloader = torch.utils.data.DataLoader(test, batch_size=batch_size)

    criterion = nn.CrossEntropyLoss()
    if model.train_bert:
        optimizer = Adam(model.parameters(), lr= learning_rate)
    else:
        optimizer = Adam(model.linear.parameters(), lr= learning_rate)

    criterion = criterion.cuda()

    for epoch_num in range(epochs):
            total_acc_train = 0
            total_loss_train = 0

            for train_input, train_label in tqdm(train_dataloader):

                train_label = train_label.to(device)
                mask = train_input['attention_mask'].to(device)
                input_id = train_input['input_ids'].squeeze(1).to(device)

                output = model(input_id, mask)
                
                batch_loss = criterion(output, train_label.long())
                total_loss_train += batch_loss.item()
                
                acc = (output.argmax(dim=1) == train_label).sum().item()
                total_acc_train += acc

                model.zero_grad()
                batch_loss.backward()
                optimizer.step()
            
            total_acc_test = 0
            total_loss_test = 0

            with torch.no_grad():

                for test_input, test_label in test_dataloader:
                    test_label = test_label.to(device)
                    mask = test_input['attention_mask'].to(device)
                    input_id = test_input['input_ids'].squeeze(1).to(device)

                    output = model(input_id, mask)

                    batch_loss = criterion(output, test_label.long())
                    total_loss_test += batch_loss.item()
                    
                    acc = (output.argmax(dim=1) == test_label).sum().item()
                    total_acc_test += acc
            
            print(f'Epochs: {epoch_num + 1}')
            print(f'| Train Accuracy: {total_acc_train / len(train_data)*100: .2f}%')
            print(f'| Train Loss: {total_loss_train/len(train_dataloader.dataset)}')
            print(f'| Test Accuracy: {total_acc_test / len(test_data)*100: .2f}%')
            print(f'| Test Loss: {total_loss_test/len(test_dataloader.dataset)}')
            print('-'*30)


In [7]:
bert_classifier = BertClassifier(train_bert=False).to(device)

Some weights of the model checkpoint at HooshvareLab/bert-fa-base-uncased were not used when initializing BertModel: ['cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.predictions.bias', 'cls.predictions.decoder.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [8]:
try:
    train(bert_classifier, train_data, test_data, tokenizer, learning_rate=1e-4, batch_size=32, epochs=4)
except KeyboardInterrupt:
    print('Training Finished')

100%|███████████████████████████████████████| 2500/2500 [04:46<00:00,  8.72it/s]


Epochs: 1
| Train Accuracy:  21.15%
| Train Loss: 0.06836959511339664
| Test Accuracy:  23.85%
| Test Loss: 0.06699122292995453
------------------------------


100%|███████████████████████████████████████| 2500/2500 [04:47<00:00,  8.70it/s]


Epochs: 2
| Train Accuracy:  24.87%
| Train Loss: 0.06642915264219046
| Test Accuracy:  25.79%
| Test Loss: 0.0659926131606102
------------------------------


100%|███████████████████████████████████████| 2500/2500 [04:47<00:00,  8.71it/s]


Epochs: 3
| Train Accuracy:  25.88%
| Train Loss: 0.0658469932422042
| Test Accuracy:  25.83%
| Test Loss: 0.06595951482653618
------------------------------


100%|███████████████████████████████████████| 2500/2500 [04:46<00:00,  8.72it/s]


Epochs: 4
| Train Accuracy:  26.36%
| Train Loss: 0.0654714984819293
| Test Accuracy:  26.07%
| Test Loss: 0.06543723168373108
------------------------------


In [9]:
def metrics(model, test_data, tokenizer):
    test_set = Poem(test_data, tokenizer)
    test_dataloader = DataLoader(test_set, batch_size=256)
    criterion = nn.CrossEntropyLoss().to(device)
    total_acc_test = 0
    total_loss_test = 0
    pred = None
    g_truth = None

    with torch.no_grad():
        for test_input, test_label in test_dataloader:
            test_label = test_label.to(device)
            if g_truth is None:
                g_truth = test_label
            else:
                g_truth = torch.cat((g_truth, test_label))
            mask = test_input['attention_mask'].to(device)
            input_id = test_input['input_ids'].squeeze(1).to(device)

            output = model(input_id, mask)

            batch_loss = criterion(output, test_label.long())
            total_loss_test += batch_loss.item()
            if pred is None:
                pred = output.argmax(dim=1)
            else:
                pred = torch.cat((pred, output.argmax(dim=1)))
        print(f'Test Loss:     {total_loss_test/len(test_dataloader.dataset)}')
        print(classification_report(g_truth.cpu(), pred.cpu()))
        
    

In [11]:
print('=========== Classification Report ===========')
metrics(bert_classifier, test_data, tokenizer)

Test Loss:     0.008276267462968827
              precision    recall  f1-score   support

           0       0.10      0.03      0.04      2064
           1       0.00      0.00      0.00      1990
           2       0.21      0.46      0.29      1968
           3       0.49      0.61      0.54      2006
           4       0.44      0.68      0.54      2022
           5       0.21      0.20      0.20      2001
           6       0.16      0.66      0.26      1921
           7       0.00      0.00      0.00      2027
           8       0.00      0.00      0.00      2030
           9       0.00      0.00      0.00      1971

    accuracy                           0.26     20000
   macro avg       0.16      0.26      0.19     20000
weighted avg       0.16      0.26      0.19     20000



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [12]:
torch.save(bert_classifier, 'feature_extractor_bert.pt')

### Part B

In [5]:
from transformers import DistilBertForSequenceClassification, DistilBertTokenizer
from tqdm import tqdm

In [6]:
fine_tuned_bert = DistilBertForSequenceClassification.from_pretrained(
    'HooshvareLab/distilbert-fa-zwnj-base',
    num_labels=10
).to(device)
tokenizer_fine = DistilBertTokenizer.from_pretrained(
    'HooshvareLab/distilbert-fa-zwnj-base'
)

Some weights of the model checkpoint at HooshvareLab/distilbert-fa-zwnj-base were not used when initializing DistilBertForSequenceClassification: ['vocab_layer_norm.weight', 'vocab_projector.weight', 'vocab_projector.bias', 'vocab_transform.bias', 'vocab_transform.weight', 'vocab_layer_norm.bias']
- This IS expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at HooshvareLab/distilbert-fa-zwnj-base and are newly initialized: ['pre_classifier.bias', 'class

In [7]:
# define hyperparameters
learning_rate = 3e-5
batch_size = 4
epochs = 3

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(fine_tuned_bert.parameters(), lr=learning_rate, eps=1e-8)
train_dataloader = DataLoader(Poem(train_data, tokenizer_fine), batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(Poem(test_data, tokenizer_fine), batch_size=batch_size, shuffle=False)

In [8]:
def train_loop(dataloader, model, loss_fn, optimizer):
    model.train()
    size = len(dataloader.dataset)
    num_correct = 0
    for X, y in tqdm(dataloader):
        losses = []
        # forward prop
        y = y.to(device)
        mask = X['attention_mask'].to(device)
        input_id = X['input_ids'].squeeze(1).to(device)
        output = model(
            input_ids=input_id,
            attention_mask=mask,
            labels=y)
        loss = output.loss
        logits = output.logits
        losses.append(loss.item())
        num_correct += (logits.argmax(dim=1) == y).sum().item()
        # back prop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    accuracy = num_correct / size
    print(f'| Train Loss: {loss.item()}')
    print(f'| Train Accuracy: {np.round(accuracy*100,2)}%')
    

In [9]:
def test_loop(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    num_correct = 0
    with torch.no_grad():
        for batch_num, (X, y) in enumerate(dataloader):
            y = y.to(device)
            mask = X['attention_mask'].to(device)
            input_id = X['input_ids'].squeeze(1).to(device)
            output = model(
                input_ids=input_id,
                attention_mask=mask,
                labels=y)
            logits = output.logits
            num_correct += (logits.argmax(dim=1) == y).sum().item()
                        
    accuracy = num_correct / size
    print(f'| Test Accuracy: {np.round(accuracy*100, 2)}%')

In [10]:
for i in range(1, epochs+1):
    print(f'Epochs: {i}') 
    train_loop(train_dataloader, fine_tuned_bert, loss_fn, optimizer)
    test_loop(test_dataloader, fine_tuned_bert, loss_fn)
    print()

Epochs: 1


100%|█████████████████████████████████████| 20000/20000 [25:34<00:00, 13.03it/s]


| Train Loss: 1.2594010829925537
| Train Accuracy: 51.39%
| Test Accuracy: 58.35%

Epochs: 2


100%|█████████████████████████████████████| 20000/20000 [25:35<00:00, 13.03it/s]


| Train Loss: 0.9723908305168152
| Train Accuracy: 66.66%
| Test Accuracy: 63.68%

Epochs: 3


100%|█████████████████████████████████████| 20000/20000 [25:39<00:00, 12.99it/s]


| Train Loss: 0.8917829990386963
| Train Accuracy: 74.5%
| Test Accuracy: 64.04%



In [22]:
def metrics(model, dataloader):
    model.eval()
    size = len(dataloader.dataset)
    num_correct = 0
    g_truth  = None
    pred = None
    
    with torch.no_grad():
        for batch_num, (X, y) in enumerate(dataloader):
            y = y.to(device)
            if g_truth is None:
                g_truth = y
            else:
                g_truth = torch.cat((g_truth, y))
            mask = X['attention_mask'].to(device)
            input_id = X['input_ids'].squeeze(1).to(device)
            output = model(
                input_ids=input_id,
                attention_mask=mask,
                labels=y)
            logits = output.logits
            if pred is None:
                pred = logits.argmax(dim=1)
            else:
                pred = torch.cat((pred,  logits.argmax(dim=1)))
    accuracy = (pred == g_truth).sum().item()/pred.shape[0]
    print(f'| Test Accuracy: {np.round(accuracy*100, 2)}%')
    print(classification_report(g_truth.cpu(), pred.cpu()))

In [15]:
print('=========== Classification Report ===========')
metrics(fine_tuned_bert, test_dataloader)

| Test Accuracy: 64.04%
              precision    recall  f1-score   support

           0       0.66      0.58      0.62      1944
           1       0.52      0.48      0.50      2041
           2       0.58      0.52      0.55      2005
           3       0.79      0.87      0.83      1994
           4       0.91      0.88      0.89      2010
           5       0.49      0.49      0.49      2023
           6       0.49      0.48      0.49      1993
           7       0.76      0.75      0.75      2017
           8       0.78      0.70      0.74      1989
           9       0.48      0.65      0.55      1984

    accuracy                           0.64     20000
   macro avg       0.65      0.64      0.64     20000
weighted avg       0.65      0.64      0.64     20000



### Part C

In [43]:
def fine_tune_preplexity(model, dataloader):
    model.eval()
    size = len(dataloader.dataset)
    num_correct = 0
    probs = []
    soft = nn.Softmax(dim=1)
    
    with torch.no_grad():
        for batch_num, (X, y) in enumerate(dataloader):
            y = y.to(device)
            mask = X['attention_mask'].to(device)
            input_id = X['input_ids'].squeeze(1).to(device)
            output = model(
                input_ids=input_id,
                attention_mask=mask,
                labels=y)
            logits = output.logits
            x = soft(logits)
            for i, indx in enumerate(y):
                probs.append(x[i, indx])
    probs = torch.Tensor(probs)
    probs = probs.log()
    prob = -1 * probs.sum()/probs.shape[0]
    return torch.exp(prob)

In [None]:
untuned_preplexity = fine_tune_preplexity(untuned_bert, test_dataloader)

In [55]:
print('========= Preplexity =========')
print(f'Preplexity of model before fine tuning:    {fine_tune_preplexity}')

Preplexity of model before fine tuning:    12.3135415


In [44]:
fine_tune_preplexity = fine_tune_preplexity(fine_tuned_bert, test_dataloader)

In [53]:
print('========= Preplexity =========')
print(f'Preplexity of model after fine tuning:    {fine_tune_preplexity}')

Preplexity of model after fine tuning:    3.008049249649048
