---
## Проанализируем датасет

In [None]:
import re
import pandas as pd

train_dataset = pd.read_csv('./Corona_NLP_train.csv', encoding = "ISO-8859-1")
test_dataset = pd.read_csv('./Corona_NLP_test.csv', encoding = "ISO-8859-1")

to_drop = ['UserName', 'ScreenName', 'Location', 'TweetAt']

train_dataset = train_dataset.drop(columns=to_drop).sample(frac=1, random_state=42) # shuffle rows
test_dataset = test_dataset.drop(columns=to_drop)

def clean_text(text):
    text = text.lower()
    hashtags = "#[\S]+"
    mentions = "@[\S]+"
    url = "https?://[A-z0-9_%/\-\.]+[A-z0-9_\.\-\?&=%]+"
    text = re.sub(url,"",text)
    text = re.sub(hashtags,"",text)
    text = re.sub(mentions,"",text)
    puntuations = "[\.\?!,;:]+"
    text = re.sub(puntuations,"",text)
    return text

map = {
    'Extremely Negative': 0,
    'Negative': 1,
    'Neutral': 2,
    'Positive': 3,
    'Extremely Positive': 4
}

revmap = {
    0: 'Extremely Negative',
    1: 'Negative',
    2: 'Neutral',
    3: 'Positive',
    4: 'Extremely Positive'
}

train_dataset['OriginalTweet'] = train_dataset['OriginalTweet'].apply(lambda x: clean_text(x))
test_dataset['OriginalTweet'] = test_dataset['OriginalTweet'].apply(lambda x: clean_text(x))

train_dataset

In [None]:
import seaborn as sns
sns.set(rc={'figure.figsize':(11,5)})
sns.countplot(data=train_dataset, x='Sentiment', width=0.5)

In [None]:
train_dataset['Sentiment'] = train_dataset['Sentiment'].apply(lambda x: map[x])
test_dataset['Sentiment'] = test_dataset['Sentiment'].apply(lambda x: map[x])

In [None]:
import seaborn as sns
sns.set(rc={'figure.figsize':(11,5)})
sns.countplot(data=test_dataset, x='Sentiment', width=0.5)

In [None]:
train_dataset['OriginalTweet'].str.len().plot(kind='kde')

---
## Реализуем бейзлайн с помощью классических методов обучения

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import f1_score

vectorizer = TfidfVectorizer()

x = train_dataset['OriginalTweet']
y = train_dataset['Sentiment']

x_test = test_dataset['OriginalTweet']
y_test = test_dataset['Sentiment']

X_train = vectorizer.fit_transform(x)
X_test = vectorizer.transform(x_test)

targets = [revmap[label] for label in sorted(y.unique())]

### 1. Логистическая регрессия

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report

logreg = LogisticRegression(n_jobs=4, C=1e5, solver='saga',
                            multi_class='multinomial', random_state=42)

logreg.fit(X_train, y)
print(classification_report(y_test, logreg.predict(X_test), target_names=targets))

### 2. Библиотека градиентного бустинга CatBoost 

In [None]:
from sklearn.pipeline import Pipeline
from catboost import CatBoostClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import classification_report

clf = CatBoostClassifier(
        task_type='GPU',
        iterations=500, 
        learning_rate=0.35,
        loss_function='MultiClass'
        )

clf.fit(X_train, y, plot=True, verbose=50)
print(classification_report(y_test, clf.predict(X_test), target_names=targets))

In [None]:
print('Logreg: ', f1_score(y_test, logreg.predict(X_test), average='weighted'))
print('Catboost: ', f1_score(y_test, clf.predict(X_test), average='weighted'))

---
## Опишем модель и датасет с помощью PyTorch

In [None]:
import torch
from torch import nn
from prettytable import PrettyTable
from transformers import AutoModel, AutoTokenizer

class EncoderForSequenceClassification(nn.Module):
    def __init__(self, model_name, classes, learn='all'):
        super(EncoderForSequenceClassification, self).__init__()

        self.encoder = AutoModel.from_pretrained(model_name)
        self.classifier = nn.Linear(self.encoder.config.hidden_size, classes)

        if learn == 'classifier':
            for param in self.encoder.parameters():
                param.requires_grad = False

    def forward(self, input_ids, attention_mask=None, labels=None):
        outputs = self.encoder(
            input_ids,
            attention_mask=attention_mask
        )
        
        pooled_output = outputs[1]
        predicts = self.classifier(pooled_output)
        predicts = nn.Softmax(dim=-1)(predicts)
        
        loss_fn = nn.CrossEntropyLoss()
        return {
            'predicts': predicts,
            'loss': loss_fn(predicts, labels) if labels is not None else None
        }

def count_parameters(model):
    table = PrettyTable(["Modules", "Parameters"])
    total_params = 0
    for name, parameter in model.named_parameters():
        if not parameter.requires_grad: continue
        params = parameter.numel()
        table.add_row([name, params])
        total_params+=params
    print(table)
    print(f"Total Trainable Params: {total_params}")
    return total_params

model_name = [
    'prajjwal1/bert-tiny',
    'prajjwal1/bert-medium',
    'sentence-transformers/all-MiniLM-L6-v2',
    'sentence-transformers/all-distilroberta-v1',
    'albert-base-v2'
    ][2]
tokenizer = AutoTokenizer.from_pretrained(model_name)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import OneHotEncoder
ohe = OneHotEncoder(sparse=False)

class TwitterDataset(Dataset):
    def __init__(self, set) -> None:
        super(TwitterDataset, self).__init__()
        self.input_ids = set['input_ids']
        # self.token_type_ids = set['token_type_ids']
        self.attention_mask = set['attention_mask']
        self.labels = set['labels']
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, index):
        return {
        'input_ids': self.input_ids[index],
        # 'token_type_ids': self.token_type_ids[index],
        'attention_mask': self.attention_mask[index],
        'labels': self.labels[index]
    }

def dset(set):
    tweets = set['OriginalTweet'].to_list()
    tokenized = tokenizer(tweets, return_tensors='pt', padding='max_length', truncation=True, max_length=80)

    return {
        'input_ids': tokenized['input_ids'],
        # 'token_type_ids': tokenized['token_type_ids'],
        'attention_mask': tokenized['attention_mask'],
        'labels': ohe.fit_transform(set['Sentiment'].to_numpy().reshape(-1, 1))
    }

train_set = DataLoader(TwitterDataset(dset(train_dataset)), batch_size=32)
test_set = DataLoader(TwitterDataset(dset(test_dataset)), batch_size=64)

---
## Определим количество токенов в каждом твите

In [None]:
import seaborn as sns
sns.set(rc={'figure.figsize':(11,5)})
sns.displot((train_set.dataset.attention_mask != 0).sum(dim=1))

Определив количество токенов, мы поставим ограничение в 80 токенов в функцию токенизации, там, где формируется датасет. Таким образом модель будет работать быстрее.

---
## Тренировка модели

In [None]:
import torch
import numpy as np
from tqdm import tqdm, trange
import matplotlib.pyplot as plt
from torch.optim import AdamW, lr_scheduler
from sklearn.metrics import classification_report

def train(model, loader, epochs, lr):
    optimizer = AdamW(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    scheduler = lr_scheduler.OneCycleLR(
        optimizer=optimizer, 
        total_steps=len(loader)*epochs,
        max_lr=4e-4
        )

    model.train()
    
    losses = []
    pbar, step = tqdm(range(len(loader) * epochs)), 0
 
    for epoch in range(epochs):
        for batch in loader:
            batch = {k: v.to(device) for k, v in batch.items()}
            output = model(**batch)

            loss = output['loss']
            losses.append(loss.item())
            pbar.set_description(f'Epoch: {epoch+1}, training loss: {loss.item():0.5f}, lr: {scheduler.get_last_lr()}')

            loss.backward()

            optimizer.step()
            optimizer.zero_grad()

            step +=1
            pbar.update(1)
        scheduler.step()

    return losses

def evaluate(model, set):
    pred, true = [], []

    model.eval()
    with torch.no_grad():
        for batch in tqdm(set):
            batch = {k: v.to(device) for k, v in batch.items()}
            output = model(**batch)
            pred.extend(torch.argmax(output['predicts'].cpu(), dim=-1))
            true.extend(torch.argmax(batch['labels'].cpu(), dim=-1))
    pred = np.array(pred)
    true = np.array(true)

    return true, pred

In [None]:
encoder = EncoderForSequenceClassification(
    model_name, 
    train_dataset['Sentiment'].nunique()
    ).to(device)

losses = train(encoder, train_set, epochs=3, lr=1e-4)
plt.plot(torch.arange(len(losses)), losses)

In [None]:
torch.save(encoder, f'./models/all-MiniLM-L6-v2_ep3_lr1e-4.pt')

In [None]:
encoder = torch.load('./models/all-MiniLM-L6-v2_ep3_lr1e-4.pt')

In [None]:
true, pred = evaluate(encoder, test_set)
print(classification_report(true, pred, target_names=targets))

In [None]:
true, pred = evaluate(encoder, test_set)
print('F1 Weighted: ', f1_score(true, pred, average='weighted'))

In [None]:
from sklearn.metrics import confusion_matrix
conf_mat = confusion_matrix([revmap[x] for x in true], [revmap[x] for x in pred])
conf_mat_normalized = conf_mat.astype('float') / conf_mat.sum(axis=1)[:, np.newaxis]
sns.heatmap(conf_mat_normalized, annot=True, linewidth=.5, cmap="crest")
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

---
## Смотрим на эмбеддинги до тренировки

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
import plotly.express as px
from sklearn.decomposition import PCA

def make_embeds(model, loader):
    size = len(loader.dataset.input_ids)
    embs = torch.rand((size, model.config.hidden_size), dtype=torch.float32)

    with torch.no_grad():
        i = 0
        for batch in tqdm(loader):
            batch = {k: v.to(device) for k, v in batch.items() if k != 'labels'}
            embedding = model(**batch)
            embs[i:i+loader.batch_size] = embedding[1].to('cpu')
            i+=loader.batch_size

    return embs.detach().numpy()

def plot_embeddings(embeddings, labels):
    pca = PCA(n_components=2, svd_solver='full')
    reduced = pca.fit_transform(embeddings)

    df = pd.DataFrame(reduced, columns=['x1', 'x2'])
    df['labels'] = labels

    sns.set(rc={'figure.figsize':(10,10)})
    return sns.scatterplot(df, x='x1', y='x2', hue='labels', palette='rocket', hue_order=targets, linewidth=0, s=15)

In [None]:
model = EncoderForSequenceClassification(
    model_name,
    train_dataset['Sentiment'].nunique()
).to(device)
model.eval()

train_embeddings = make_embeds(model.encoder, train_set)
plot_embeddings(train_embeddings, [revmap[x] for x in train_dataset['Sentiment']])

In [None]:
logreg = LogisticRegression()

logreg.fit(train_embeddings, train_dataset['Sentiment'])
print(classification_report(test_dataset['Sentiment'], logreg.predict(test_embeddings), target_names=targets))
print(f1_score(test_dataset['Sentiment'], logreg.predict(test_embeddings), average='weighted'))

In [None]:
catboost = CatBoostClassifier(
        task_type='GPU',
        iterations=50,
        learning_rate=0.35,
        loss_function='MultiClass'
        )

logreg.fit(embeddings, train_dataset['Sentiment'])
print(classification_report(test_dataset['Sentiment'], logreg.predict(test_embs), target_names=targets))
print(f1_score(test_dataset['Sentiment'], logreg.predict(test_embs), average='weighted'))

---
## Используем эмбеддинги обученной модели для классификации классическими методами

In [None]:
model = torch.load('./models/all-MiniLM-L6-v2_ep3_lr1e-4.pt')

train_embs = make_embeds(
    model.encoder,
    train_set
)
test_embs = make_embeds(
    model.encoder,
    test_set
)

plot_embeddings(train_embs, [revmap[x] for x in train_dataset['Sentiment']])

In [None]:
from catboost import CatBoostClassifier

catboost = CatBoostClassifier(
        task_type='GPU',
        iterations=50,
        learning_rate=0.35,
        loss_function='MultiClass'
        )

catboost.fit(train_embs, train_dataset['Sentiment'], plot=True, verbose=False)
print(classification_report(test_dataset['Sentiment'], catboost.predict(test_embs), target_names=targets))

In [None]:
from sklearn.linear_model import LogisticRegression
logreg = LogisticRegression()

logreg.fit(train_embs, train_dataset['Sentiment'])
print(classification_report(test_dataset['Sentiment'], logreg.predict(test_embs), target_names=targets))

In [None]:
from sklearn.svm import SVC

svc = SVC()
svc.fit(train_embs, train_dataset['Sentiment'])
print(classification_report(test_dataset['Sentiment'], svc.predict(test_embs), target_names=targets))

In [None]:
from sklearn.neural_network import MLPClassifier

mlp = MLPClassifier((32, 8))
mlp.fit(train_embs, train_dataset['Sentiment'])
print(classification_report(test_dataset['Sentiment'], mlp.predict(test_embs), target_names=targets))

In [None]:
from sklearn.metrics import f1_score, accuracy_score
from prettytable import PrettyTable

def f1(model):
    return int(f1_score(test_dataset['Sentiment'], model.predict(test_embs), average='weighted') * 100) / 100
def acc(model):
    return int(accuracy_score(test_dataset['Sentiment'], model.predict(test_embs)) * 100) / 100

table = PrettyTable(['Model', 'F1 Weighted', 'Accuracy'])
table.add_row(['CatBoost Classifier', f1(catboost), acc(catboost)])
table.add_row(['Logistic Regression', f1(logreg), acc(logreg)])
table.add_row(['Support Vector Machines', f1(svc), acc(svc)])
table.add_row(['Multilayer Perceptron', f1(mlp), acc(mlp)])

print(table)

In [None]:
encoder = torch.load('./models/all-MiniLM-L6-v2_ep3_lr1e-4.pt').to(device)

In [None]:
encoder = 

In [None]:
train_embs = make_embeds(encoder.encoder, train_set)

In [None]:
test_embs = make_embeds(encoder.encoder, test_set)

In [None]:
from sklearn.decomposition import PCA
pca = PCA(n_components=2, svd_solver='full')
reduced = pca.fit_transform(test_embs)

In [None]:
df = pd.DataFrame(reduced, columns=['x1', 'x2'])
df['labels'] = [revmap[x] for x in test_dataset['Sentiment']]
sns.set(rc={'figure.figsize':(10,10)})
sns.scatterplot(df, x='x1', y='x2', hue='labels', palette='rocket', hue_order=targets, linewidth=0, s=15)

In [None]:
from sklearn.decomposition import PCA
pca = PCA(n_components=2, svd_solver='full')
reduced = pca.fit_transform(train_embs)

df = pd.DataFrame(reduced, columns=['x1', 'x2'])
df['labels'] = [revmap[x] for x in train_dataset['Sentiment']]
sns.set(rc={'figure.figsize':(10,10)})
sns.scatterplot(df, x='x1', y='x2', hue='labels', palette='rocket', hue_order=targets, linewidth=0, s=15)