In [None]:
import sys

!{sys.executable} -m pip install torch numpy transformers pandas scikit-learn sklearn

In [None]:
import torch

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

In [None]:
from transformers import AutoTokenizer, AutoModel, TFAutoModel
import numpy as np

MODEL = "cardiffnlp/twitter-roberta-base"
tokenizer = AutoTokenizer.from_pretrained(MODEL)

text = ["Good night honey ðŸ˜Š", "lmao xd ðŸ˜Š"]
#text = preprocess(text)

# Pytorch
roberta = AutoModel.from_pretrained(MODEL).to(device)
modules = [roberta.embeddings, *roberta.encoder.layer[:10]]  #do przetestowania rÃ³Å¼ne parametry
for module in modules:
    for param in module.parameters():
        param.requires_grad = False
#encoded_input = tokenizer(text, return_tensors='pt', padding=True)
#features = model(**encoded_input)

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split


def preprocess(text):
    new_text = []
    for t in text.split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)


df = pd.read_csv("train_dataset.csv")
X = df["tweet_content"].tolist()
X = [preprocess(x) for x in X]
y = (df["sarcasm_label"] == 'sarcastic').tolist()


In [None]:
from torch.nn.functional import pad

X_encoded = tokenizer(X, return_tensors='pt', padding=True)['input_ids']

In [None]:
X_train, X_cross, y_train, y_cross = train_test_split(X_encoded, y, test_size=0.2, random_state=42)
y_train = torch.Tensor(y_train).type(torch.long)
y_cross = torch.Tensor(y_cross).type(torch.long)


In [None]:
#dobija do f_score = 0.4 gdy trenujemy roberte
import torch
from torch import nn


class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.roberta = roberta
        self.lstm = nn.LSTM(768, 64, 2, bidirectional=True, dropout=0.2)
        self.linear1 = nn.Linear(716, 100)
        self.linear2 = nn.Linear(100, 20)
        self.linear3 = nn.Linear(20, 2)
        self.pooling = nn.MaxPool2d((30, 5))
        self.softmax = nn.Softmax(dim=1)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.2)

    def forward(self, x):
        x1 = self.roberta(x).last_hidden_state
        x, _ = self.lstm(x1)
        x = torch.cat((x1, x), dim=2)
        x = self.pooling(x)
        x = x.reshape(x.shape[0], -1)
        x = self.linear1(x)
        x = self.dropout(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.dropout(x)
        x = self.relu(x)
        x = self.linear3(x)
        return x

In [None]:
from torch.utils.data import TensorDataset, DataLoader

dataset_train = TensorDataset(X_train, y_train)  # create your datset
dataloader_train = DataLoader(dataset_train, batch_size=40)  # create your dataloader
dataset_cross = TensorDataset(X_cross, y_cross)  # create your datset
dataloader_cross = DataLoader(dataset_cross, batch_size=40)  # create your dataloader

In [None]:
def train(dataloader, model, loss_fn, optimizer):  #fragment z dokumentacji pytorch
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    train_loss = 0
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)
        train_loss += loss.item()

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        if batch % 50 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss :>7f}  [{current:>5d}/{size:>5d}]")
    train_loss /= num_batches
    print(f"Avg train loss: {train_loss:>8f} \n")


In [None]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    true_positive, true_negative, false_negative, false_positive = 0.0001, 0.0001, 0.0001, 0.0001  #zeby dzielenia przez zero nie bylo
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            y_pred = pred.argmax(1)
            correct += (y_pred == y).type(torch.long).sum().item()

            true_positive += torch.logical_and(y_pred == y, y == 1).sum().item()
            true_negative += torch.logical_and(y_pred == y, y == 0).sum().item()
            false_negative += torch.logical_and(y_pred != y, y == 1).sum().item()
            false_positive += torch.logical_and(y_pred != y, y == 0).sum().item()

    recall = true_positive / (true_positive + false_negative)
    precision = true_positive / (true_positive + false_positive)
    specificity = true_negative / (false_positive + true_negative)
    f_score = (2 * precision * recall) / (precision + recall)
    g_mean = (recall * specificity) ** (0.5)

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100 * correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    print(
        f"\n recall: {recall} \n precision: {precision} \n specificity: {specificity} \n f_score: {f_score} \n g_mean: {g_mean}\n")

In [None]:
model = Model().to(device)

In [None]:
loss_fn = nn.CrossEntropyLoss(weight=torch.Tensor([1.0, 1.0])).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

In [None]:
epochs = 20
for t in range(epochs):
    train(dataloader_train, model, loss_fn, optimizer)
    test(dataloader_cross, model, loss_fn)
print("Done!")

In [None]:
model_name = "sarcasm_detection_model"
model_name_pt = f"{model_name}.pt"
model_name_onnx = f"{model_name}.onnx"

In [None]:
torch.save(model.state_dict(), model_name_pt)

In [None]:
pytorch_model = Model()
pytorch_model.load_state_dict(torch.load(model_name_pt))
pytorch_model.eval()
dummy_input = torch.zeros(1, 1, 1, 1)  #TODO figure out the dummy input
torch.onnx.export(pytorch_model, dummy_input, model_name_onnx, verbose=True)