In [11]:
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset
from transformers import BertForSequenceClassification
from transformers import BertTokenizer
from torch.utils.data import DataLoader
from transformers import AdamW
from transformers import get_linear_schedule_with_warmup
import numpy as np
from sklearn.metrics import accuracy_score
import pickle

In [2]:
df = pd.read_csv('small_dataset.csv')

In [3]:
df_train, df_val_test, tar_train, tar_val_test = train_test_split(df['clean_motiv_part'].tolist(), df['label'].tolist(), train_size=0.8, random_state=1412) 
df_test, df_val, tar_test, tar_val = train_test_split(df_val_test, tar_val_test, train_size=0.5, random_state=1412)

In [4]:
class CustomDataset(Dataset):

  def __init__(self, texts, targets, tokenizer, max_len=512):
    self.texts = texts
    self.targets = targets
    self.tokenizer = tokenizer
    self.max_len = max_len

  def __len__(self):
    return len(self.texts)

  def __getitem__(self, idx):
    text = str(self.texts[idx])
    target = self.targets[idx]

    encoding = self.tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=self.max_len,
        return_token_type_ids=False,
        padding='max_length',
        return_attention_mask=True,
        return_tensors='pt',
        truncation=True
    )

    return {
      'text': text,
      'input_ids': encoding['input_ids'].flatten(),
      'attention_mask': encoding['attention_mask'].flatten(),
      'targets': torch.tensor(target, dtype=torch.long)
    }

In [57]:
class BertClassifier:

    def __init__(self, model_path, tokenizer_path, n_classes=None, epochs=None, model_save_path='bert_model.pth'):
        #self.model = BertForSequenceClassification.from_pretrained(model_path, ignore_mismatched_sizes=True, output_hidden_states=True)
        #self.tokenizer = BertTokenizer.from_pretrained(tokenizer_path)
        self.model = torch.load(model_path)
        self.tokenizer = torch.load(tokenizer_path)
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.max_len = 512
        self.model_save_path=model_save_path
        self.epochs = epochs
        #self.out_features = self.model.bert.encoder.layer[1].output.dense.out_features
        #self.model.classifier = torch.nn.Linear(self.out_features, n_classes)
        self.model.to(self.device)
        
    def preparation(self, X_train, y_train, X_valid, y_valid):
        batch_size = 512
        # create datasets
        self.train_set = CustomDataset(X_train, y_train, self.tokenizer)
        self.valid_set = CustomDataset(X_valid, y_valid, self.tokenizer)

        # create data loaders
        self.train_loader = DataLoader(self.train_set, batch_size=batch_size, shuffle=True)
        self.valid_loader = DataLoader(self.valid_set, batch_size=batch_size, shuffle=True)

        # helpers initialization
        self.optimizer = AdamW(self.model.parameters(), lr=2e-5, correct_bias=False)
        self.scheduler = get_linear_schedule_with_warmup(
                self.optimizer,
                num_warmup_steps=0,
                num_training_steps=len(self.train_loader) * self.epochs
            )
        self.loss_fn = torch.nn.CrossEntropyLoss().to(self.device)

    def fit(self):
        self.model = self.model.train()
        losses = []
        correct_predictions = 0

        for data in self.train_loader:
            input_ids = data["input_ids"].to(self.device)
            attention_mask = data["attention_mask"].to(self.device)
            targets = data["targets"].to(self.device)

            outputs = self.model(
                input_ids=input_ids,
                attention_mask=attention_mask
                )

            preds = torch.argmax(outputs.logits, dim=1)
            loss = self.loss_fn(outputs.logits, targets)

            correct_predictions += torch.sum(preds == targets)

            losses.append(loss.item())

            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            self.optimizer.step()
            self.scheduler.step()
            self.optimizer.zero_grad()

        train_acc = correct_predictions.double() / len(self.train_set)
        train_loss = np.mean(losses)
        return train_acc, train_loss
    
    def eval(self):
        with torch.no_grad():
            self.model = self.model.eval()
            losses = []
            correct_predictions = 0
            for data in self.valid_loader:
                input_ids = data["input_ids"].to(self.device)
                attention_mask = data["attention_mask"].to(self.device)
                targets = data["targets"].to(self.device)

                outputs = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask
                    )

                preds = torch.argmax(outputs.logits, dim=1)
                loss = self.loss_fn(outputs.logits, targets)
                correct_predictions += torch.sum(preds == targets)
                losses.append(loss.item())

        val_acc = correct_predictions.double() / len(self.valid_set)
        val_loss = np.mean(losses)
        return val_acc, val_loss
    
    def train(self):
        best_accuracy = 0
        for epoch in range(self.epochs):
            print(f'Epoch {epoch + 1}/{self.epochs}\n')
            train_acc, train_loss = self.fit()
            print(f'Train loss {train_loss} accuracy {train_acc}\n')

            val_acc, val_loss = self.eval()
            print(f'Val loss {val_loss} accuracy {val_acc}\n')
            print('--------------------\n')

            if val_acc > best_accuracy:
                #torch.save(self.model, self.model_save_path)
                best_accuracy = val_acc

    def predict(self, text):
        if text == np.NaN:
            return 0
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            truncation=True,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

        out = {
              'text': text,
              'input_ids': encoding['input_ids'].flatten(),
              'attention_mask': encoding['attention_mask'].flatten()
          }

        input_ids = out["input_ids"].to(self.device)
        attention_mask = out["attention_mask"].to(self.device)

        outputs = self.model(
            input_ids=input_ids.unsqueeze(0),
            attention_mask=attention_mask.unsqueeze(0)
        )

        prediction = torch.argmax(outputs.logits, dim=1).cpu().numpy()[0]

        return prediction
    
    def predict_proba(self, text):
        if text == np.NaN:
            return 0
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            truncation=True,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

        out = {
              'text': text,
              'input_ids': encoding['input_ids'].flatten(),
              'attention_mask': encoding['attention_mask'].flatten()
          }

        input_ids = out["input_ids"].to(self.device)
        attention_mask = out["attention_mask"].to(self.device)

        outputs = self.model(
            input_ids=input_ids.unsqueeze(0),
            attention_mask=attention_mask.unsqueeze(0)
        )
        
        return outputs.logits.cpu().detach().numpy()[0]

In [30]:
model = BertForSequenceClassification.from_pretrained("cointegrated/rubert-tiny", ignore_mismatched_sizes=True, output_hidden_states=True)

Some weights of the model checkpoint at cointegrated/rubert-tiny were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.decoder.bias', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not i

In [58]:
model_path = 'new_bert_model.pth'
tokenizer_path = 'bert_tokenizer.pt'

model = BertClassifier(model_path, tokenizer_path, n_classes=206, epochs=50)
model.preparation(df_train, tar_train, df_val, tar_val)
torch.cuda.empty_cache()



In [29]:
from tqdm import tqdm

pred_y = []
for i in tqdm(df_test):
    pred_y.append(model.predict(i))

100%|██████████████████████████████████████████████████████████████████████████████| 7595/7595 [05:24<00:00, 23.39it/s]


In [30]:
from sklearn.metrics import accuracy_score
accuracy_score(tar_test, pred_y)

0.8496379196840026

In [18]:
def get_emb(word, model, tokenizer, device):
    encoding = tokenizer.encode_plus(word, add_special_tokens=False, return_tensors='pt')
    out = {
            'text': word,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten()
        }

    input_ids = out["input_ids"].to(device)
    attention_mask = out["attention_mask"].to(device)

    outputs = model(
        input_ids=input_ids.unsqueeze(0),
        attention_mask=attention_mask.unsqueeze(0)
    )

    return outputs.hidden_states[-1].mean(dim=1)[0].cpu()

In [None]:
from IPython.display import clear_output

emb_3gr = dict()
emb_word = dict()

bert = model.model
tok = model.tokenizer
device = model.device
bert.eval() 
with torch.no_grad():
    for text in df['clean_motiv_part']:        
        list_words = text.split(" ")
        
        n, ind = len(list_words) - 2, 0

        while not list_words[ind].isalpha() and ind < n:
            ind += 1
        second_word = list_words[ind]
        ind += 1
        while not list_words[ind].isalpha() and ind < n:
            ind += 1
        third_word = list_words[ind]
        ind += 1
        
        second_emb = get_emb(second_word, bert, tok, device)
        third_emb = get_emb(third_word, bert, tok, device)
        emb_word[second_word] = second_emb
        emb_word[third_word] = third_emb

        while ind < n:
            first_word = second_word
            second_word = third_word
            while not list_words[ind].isalpha() and ind < n:
                ind += 1
            third_word = list_words[ind]
            
            first_emb = second_emb
            second_emb = third_emb
            third_emb = get_emb(third_word, bert, tok, device)
            
            emb_word[third_word] = third_emb
            emb_3gr[first_word+' '+second_word+' '+third_word] = torch.cat((first_emb, second_emb, third_emb), dim=0)

            ind+=1

In [None]:
import pickle

a_file = open("dict_emb_gr.pkl", "wb")
pickle.dump(emb_3gr, a_file)
a_file.close()

b_file = open("dict_emb_word.pkl", "wb")
pickle.dump(emb_word, b_file)
b_file.close()