In [0]:
import numpy as np
import nltk
import pandas as pd
import copy
import math
import matplotlib.pyplot as plt
import matplotlib as mpl
from collections import Counter
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.model_selection import StratifiedKFold
import string
import  random
from spacy.lang.ro import Romanian
from spacy.lang.ro.stop_words import STOP_WORDS

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim 
from torchvision.transforms import transforms
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchtext import data

In [2]:
cd /content/drive/My Drive/Proiect_IA_3

/content/drive/My Drive/Proiect_IA_3


In [0]:
use_cuda = torch.cuda.is_available()
torch.manual_seed(1024)
device = torch.device("cuda" if use_cuda else "cpu")
torch.backends.cudnn.deterministic = True  

In [0]:
train_data = pd.read_csv('train_data.csv', sep='\t', encoding='utf-8', lineterminator='\n', header = 0, names=['Id', 'Text', 'Label'])
validation_data = pd.read_csv('valid_data.csv', sep='\t', encoding='utf-8', lineterminator='\n', header = 0, names=['Id', 'Text', 'Label'])

In [0]:
def full_texts(texts):
  text = " "
  for it in texts:
    text += " ".join(it)
  return text

In [0]:
text = full_texts(train_data['Text'].tolist())

In [0]:
class Vocabulary:
    """
    Helper class that maps characters to unique indices and the other way around
    """
    def __init__(self, text: str):
        # PAD is a special character for padding shorter sequences 
        # in a mini-batch
        # create a set out of all characters
        characters_set = set(["0"]) 
        characters_set.update(text)
        
        #create a dictionary for characters
        self.char_to_idx = {char:idx for (idx, char) 
                            in enumerate(characters_set)}
        self.idx_to_char = {idx:char for (idx, char) 
                            in enumerate(characters_set)}
   
    def size(self):
        return len(self.char_to_idx)
      
    def __str__(self):
        return str(self.char_to_idx)

In [12]:
vocab = Vocabulary(text)
print("Vocabulary size: ", vocab.size())
print("Vocabulary: \n", vocab)

Vocabulary size:  61
Vocabulary: 
 {'=': 0, 'l': 1, '!': 2, 'Y': 3, 'n': 4, ':': 5, 'E': 6, 'A': 7, ';': 8, 'g': 9, '<': 10, 'h': 11, 'o': 12, 'X': 13, 'w': 14, 'K': 15, 'b': 16, '#': 17, '@': 18, 'Z': 19, 'W': 20, '0': 21, 'k': 22, 'F': 23, 'B': 24, 'd': 25, 'p': 26, '*': 27, '&': 28, '|': 29, 'H': 30, '>': 31, 'R': 32, 'r': 33, '.': 34, '(': 35, 'N': 36, 'z': 37, 'i': 38, 'T': 39, ' ': 40, 'q': 41, 'C': 42, 'm': 43, 'U': 44, 't': 45, '$': 46, '}': 47, 'c': 48, 'j': 49, 'x': 50, '%': 51, 'e': 52, "'": 53, 'D': 54, 'S': 55, 'f': 56, 'y': 57, 'v': 58, 's': 59, 'a': 60}


In [0]:
def text_to_tensor(text: str, vocab: Vocabulary) -> torch.LongTensor:
    """
    Convert a string to a Tensor with corresponding character indices
    e.g. "We have" -> [48, 13,  2, 66, 56, 31, 13 
    """
    text_indices = [vocab.char_to_idx[c] for c in text]
  
    return torch.tensor(text_indices)

In [0]:
def my_collate(batch):
  new_data = []
  for item in batch:
    new_data.append(F.pad(input=item[0], pad=(0, 2000 - item[0].shape[0]), mode='constant', value=vocab.char_to_idx['0']))
  data = torch.stack(new_data, dim = 0)
  target = torch.stack([torch.tensor(item[1]) for item in batch], dim = 0)
  return [data, target]

In [0]:
class TextsDataset(Dataset):
    def __init__(self, texts, labels=None, vocab = None, max_length = 1004):
        self.X = texts
        self.y = labels
        self.vocab = vocab
        self.max_len = max_length
         
    def __len__(self):
        return (len(self.X))
    
    def __getitem__(self, i):
        data = self.X[i]
        data = text_to_tensor(data, self.vocab)
        if self.y is not None:
            y = self.y[i]
            return (data, y)
        else:
            return data

In [0]:
training_dataset = TextsDataset(train_data['Text'].tolist(), train_data["Label"].tolist(),vocab, 1004)
validing_dataset = TextsDataset(validation_data["Text"].tolist(), validation_data["Label"].tolist(),vocab, 1004)

In [0]:
batch_size = 128

In [0]:
trainloader = DataLoader(training_dataset, batch_size=batch_size, shuffle=True, drop_last=True, collate_fn=my_collate)
validloader = DataLoader(validing_dataset, batch_size=batch_size, shuffle=True, drop_last = True, collate_fn=my_collate) 

In [0]:
class CNN_Text(nn.Module):
    
    def __init__(self, vocab_size, embed_size):
        super(CNN_Text, self).__init__()
        filter_sizes = [1,2,3,5]
        num_filters = 36
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.convs1 = nn.ModuleList([nn.Conv2d(1, num_filters, (K, embed_size)) for K in filter_sizes])
        self.dropout = nn.Dropout(0.3)
        self.fc1 = nn.Linear(len(filter_sizes)*num_filters, 1)


    def forward(self, x):
        x = self.embedding(x)  
        x = x.unsqueeze(1)  
        x = [F.relu(conv(x)).squeeze(3) for conv in self.convs1] 
        x = [F.max_pool1d(i, i.size(2)).squeeze(2) for i in x]  
        x = torch.cat(x, 1)
        x = self.dropout(x)  
        logit = self.fc1(x)  
        return torch.sigmoid(logit)

In [0]:
size_of_vocab = vocab.size()
embedding_dim = 100

#instantiate the model
model_r =CNN_Text(size_of_vocab, embedding_dim)

In [45]:
#architecture
print(model_r)

#No. of trianable parameters
def count_parameters(model):
    return sum(p.numel() for p in model_r.parameters() if p.requires_grad)
    
print(f'The model has {count_parameters(model_r):,} trainable parameters')

CNN_Text(
  (embedding): Embedding(61, 100)
  (convs1): ModuleList(
    (0): Conv2d(1, 36, kernel_size=(1, 100), stride=(1, 1))
    (1): Conv2d(1, 36, kernel_size=(2, 100), stride=(1, 1))
    (2): Conv2d(1, 36, kernel_size=(3, 100), stride=(1, 1))
    (3): Conv2d(1, 36, kernel_size=(5, 100), stride=(1, 1))
  )
  (dropout): Dropout(p=0.3, inplace=False)
  (fc1): Linear(in_features=144, out_features=1, bias=True)
)
The model has 45,989 trainable parameters


In [0]:
#define optimizer and loss
optimizer = optim.Adam(model_r.parameters(), lr = 0.01)
criterion = nn.BCELoss()

#define metric
def binary_accuracy(preds, y):
    #round predictions to the closest integer
    rounded_preds = torch.round(preds)
    
    correct = (rounded_preds == y).float() 
    acc = correct.sum() / len(correct)
    return acc
    
#push to cuda if available
model_r = model_r.to(device)
criterion = criterion.to(device)

In [0]:
def train(model, train_iterator, optimizer, criterion):
    
    #initialize every epoch 
    epoch_loss = 0
    epoch_acc = 0
    clip = 3
    
    #set the model in training phase
    model.train()  
    for inputs, labels in train_iterator:
        #print(it)
        inputs, labels = inputs.to(device), labels.to(device)
        #resets the gradients after every batch
        optimizer.zero_grad()   
        
        #retrieve text and no. of words
        #print("retrive")
        output = model(inputs) 
        
        #compute the loss
        #print("loss")
        loss = criterion(output.squeeze(), labels.float())  
        
        #compute the binary accuracy
        #print("acc")
        acc = binary_accuracy(output.squeeze(), labels)   
        
        #print("back")
        #backpropage the loss and compute the gradients
        loss.backward() 
        nn.utils.clip_grad_norm_(model.parameters(), clip)      
        
        #print("optim")
        #update the weights
        optimizer.step()      
        
        #loss and accuracy
        epoch_loss += loss.item()  
        epoch_acc += acc.item()      
    return epoch_loss / len(train_iterator), epoch_acc / len(train_iterator)

In [0]:
def evaluate(model, eval_iterator, criterion):
    
    #initialize every epoch
    epoch_loss = 0
    epoch_acc = 0
    clip = 2

    #deactivating dropout layers
    model.eval()
    
    #deactivates autograd
    with torch.no_grad():
    
        for inputs, labels in eval_iterator:

            #retrieve text and no. of words
            inputs, labels = inputs.to(device), labels.to(device)
            
            #convert to 1d tensor
            output= model(inputs)
            
            #compute loss and accuracy
            test_loss = criterion(output.squeeze(), labels.float())
            acc = binary_accuracy(output.squeeze(), labels) 
            
            #keep track of loss and accuracy
            epoch_loss += test_loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(eval_iterator), epoch_acc / len(eval_iterator)

In [0]:
N_EPOCHS = 50
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
     
    #train the model
    train_loss, train_acc = train(model_r, trainloader, optimizer, criterion)
    
    print("Epoch: ", epoch)
    #evaluate the model
    valid_loss, valid_acc = evaluate(model_r, validloader, criterion)
     
   #save the best model
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model_r.state_dict(), 'saved_weights_3.pt')
    
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

In [0]:
model_r.load_state_dict(torch.load("saved_weights_3.pt"))

In [0]:
target_loss, target_acc = evaluate(model_r, target_iterator, criterion)