# Imports

In [None]:
import numpy as np
from datasets import load_dataset
from huggingface_hub import PyTorchModelHubMixin
from sentence_transformers import SentenceTransformer

import torch
import sklearn.metrics as metrics
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from torch.optim.lr_scheduler import ReduceLROnPlateau

from torch.optim import AdamW
import copy

from tqdm import tqdm, trange

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.cuda.get_device_name(0)

# Data

In [None]:
FINAL_SUBMISSION = False # If true, trains on the full set

In [None]:
dataset = load_dataset("quotaclimat/frugalaichallenge-text-train")

In [None]:
LABELS = [
    '0_not_relevant',
    '1_not_happening'
    '2_not_human',
    '3_not_bad',
    '4_solutions_harmful_unnecessary',
    '5_science_unreliable',
    '6_proponents_biased',
    '7_fossil_fuels_needed'
]

In [None]:
data_train = dataset['train']
data_test = dataset['test']

In [None]:
train_texts = [t['quote'] for t in data_train]
test_texts = [t['quote'] for t in data_test]



labels_train = [int(t['label'][0]) for t in data_train]
labels_test = [int(t['label'][0]) for t in data_test]


if FINAL_SUBMISSION:
    train_texts = train_texts+test_texts
    labels_train = labels_train+labels_test

# classes weights for CE Loss
weights_tmp = []
for i in range(0, 8):
    weights_tmp.append(labels_train.count(i))

weights = [len(labels_train)/(w+1) for w in weights_tmp]

weights = torch.FloatTensor(weights).to(device)
weights

In [None]:
emb_model = SentenceTransformer("sentence-transformers/sentence-t5-large")
batch_size = 2

train_tokens = torch.Tensor(emb_model.encode(train_texts))
train_labels = labels_train
train_labels = torch.tensor(train_labels)
train_data = TensorDataset(train_tokens, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

test_tokens = torch.Tensor(emb_model.encode(test_texts))
test_labels = labels_test
test_labels = torch.tensor(test_labels)
test_data = TensorDataset(test_tokens, test_labels)
test_sampler = SequentialSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)

# Model

In [None]:
class ConspiracyClassification(
    nn.Module,
    PyTorchModelHubMixin, 
    # optionally, you can add metadata which gets pushed to the model card
):    
    def __init__(self, num_classes):
        super().__init__()
        self.h1 = nn.Linear(768, 100)
        self.h2 = nn.Linear(100, 100)
        self.h3 = nn.Linear(100, 100)
        self.h4 = nn.Linear(100, 50)
        self.h5 = nn.Linear(50, num_classes)
        self.dropout = nn.Dropout(0.1)
        self.activation = nn.ReLU()

        
    def forward(self, input_texts):
        outputs = self.h1(input_texts)
        outputs = self.activation(outputs)
        outputs = self.dropout(outputs)
        outputs = self.h2(outputs)
        outputs = self.activation(outputs)
        outputs = self.dropout(outputs)
        outputs = self.h3(outputs)
        outputs = self.activation(outputs)
        outputs = self.dropout(outputs)
        outputs = self.h4(outputs)
        outputs = self.activation(outputs)
        outputs = self.dropout(outputs)
        outputs = self.h5(outputs)
        
        return outputs

In [None]:
config = {"num_classes": 8}
model = ConspiracyClassification(**config)
model.to(device)

In [None]:
optimizer = AdamW(model.parameters(),
                  lr=5e-4,
                  weight_decay = 0.01)

scheduler = ReduceLROnPlateau(optimizer, patience=4, factor=0.3)

In [None]:
criterion = nn.CrossEntropyLoss(weight = weights)    

# Training

In [None]:
epochs = 15

best_MCCA = 0
best_F1 = 0
best_loss = 999
best_ACC = 0
results = []

best_state_dict = model.state_dict()

for e in trange(0, epochs, position=0, leave=True):

    print('Starting epoch ', e)
    model.train()
        
    tr_loss = 0
    nb_tr_examples, nb_tr_steps = 0, 0
    x_features = []
    y_true = []
    for step, batch in enumerate(train_dataloader):
        batch = tuple(t.to(device) for t in batch)
        b_tokens, b_labels = batch            
        b_labels = b_labels.float()
        optimizer.zero_grad()
        
        logits = model(b_tokens)
        
            
        loss = criterion(logits, b_labels.long())
        loss.backward()
        optimizer.step()

        tr_loss += loss.item()
        nb_tr_examples += b_input_ids.size(0)
        nb_tr_steps += 1
    
    print("Train loss: {}".format(tr_loss/nb_tr_steps))
    
    
    model.eval()
    
    predictions_sep = []
    labels_sep = []
    
    eval_loss = 0
    steps=0
    x_features = []
    y_true = []
    for step, batch in enumerate(test_dataloader):
        batch = tuple(t.to(device) for t in batch)

        b_tokens, b_labels = batch
        b_labels = b_labels.float()
        
        with torch.no_grad():

            logits = model(b_tokens)
            loss = criterion(logits, b_labels.long())
            logits = logits.detach().cpu().numpy()
            ground_truth = b_labels.detach().cpu().numpy()
            steps+=1
            eval_loss+=loss.detach().item()
            predictions_sep.extend(logits.argmax(1))
            for l in ground_truth:
                labels_sep.append(l)
        
    scheduler.step(eval_loss/steps)
    LOSS = eval_loss/steps
    
    ACC = metrics.accuracy_score(labels_sep, predictions_sep)
    F1 = metrics.f1_score(labels_sep, predictions_sep, average='macro')
    MCCA = metrics.matthews_corrcoef(labels_sep, predictions_sep)
    
    if ACC> best_ACC:
        best_MCCA = MCCA
        best_ACC = ACC
        best_F1 = F1
        best_loss = LOSS
        best_state_dict = copy.deepcopy(model.state_dict())
        best_epoch = e
        
    results.append([LOSS, ACC, F1, MCCA])
    print("\t Eval loss: {}".format(LOSS))
    print("\t Eval ACC: {}".format(ACC))
    print("\t Eval F1: {}".format(F1))
    print("\t Eval MCCA: {}".format(MCCA))
    print("---"*25)
    print("\n")

In [None]:
model.load_state_dict(best_state_dict)

In [None]:
from huggingface_hub import login

HF_token = "<YOUR_TOKEN>"
login(HF_token)

In [None]:
model.save_pretrained("./sbert+mlp_model")

# Inference

In [None]:
import numpy as np
from datasets import load_dataset
from huggingface_hub import PyTorchModelHubMixin
from sentence_transformers import SentenceTransformer

import torch
import sklearn.metrics as metrics
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from torch.optim.lr_scheduler import ReduceLROnPlateau

from torch.optim import AdamW
import copy

from tqdm import tqdm, trange

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.cuda.get_device_name(0)

In [None]:
class ConspiracyClassification768(
    nn.Module,
    PyTorchModelHubMixin, 
    # optionally, you can add metadata which gets pushed to the model card
):    
    def __init__(self, num_classes=8):
        super().__init__()
        self.h1 = nn.Linear(768, 100)
        self.h2 = nn.Linear(100, 100)
        self.h3 = nn.Linear(100, 100)
        self.h4 = nn.Linear(100, 50)
        self.h5 = nn.Linear(50, num_classes)
        self.dropout = nn.Dropout(0.2)
        self.activation = nn.ReLU()

        
    def forward(self, input_texts):
        outputs = self.h1(input_texts)
        outputs = self.activation(outputs)
        outputs = self.dropout(outputs)
        outputs = self.h2(outputs)
        outputs = self.activation(outputs)
        outputs = self.dropout(outputs)
        outputs = self.h3(outputs)
        outputs = self.activation(outputs)
        outputs = self.dropout(outputs)
        outputs = self.h4(outputs)
        outputs = self.activation(outputs)
        outputs = self.dropout(outputs)
        outputs = self.h5(outputs)
        
        return outputs  


In [None]:
# Define the label mapping
LABEL_MAPPING = {
    "0_not_relevant": 0,
    "1_not_happening": 1,
    "2_not_human": 2,
    "3_not_bad": 3,
    "4_solutions_harmful_unnecessary": 4,
    "5_science_unreliable": 5,
    "6_proponents_biased": 6,
    "7_fossil_fuels_needed": 7
}

# Load and prepare the dataset
dataset = load_dataset("quotaclimat/frugalaichallenge-text-train")

# Convert string labels to integers
dataset = dataset.map(lambda x: {"label": LABEL_MAPPING[x["label"]]})

# Split dataset
train_test = dataset["train"]
test_dataset = dataset["test"]

In [None]:
model = ConspiracyClassification768.from_pretrained("ypesk/frugal-ai-mlp-768-fullset")
model = model.to(device)
emb_model = SentenceTransformer("sentence-transformers/sentence-t5-large")
batch_size = 6

test_tokens = torch.Tensor(emb_model.encode([t['quote'] for t in test_dataset]))
test_data = TensorDataset(test_tokens)
test_sampler = SequentialSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=batch_size)


In [None]:
predictions = []

model.eval()

for batch in tqdm(test_dataloader):

    batch = tuple(t.to(device) for t in batch)
    with torch.no_grad():
        b_tokens = batch[0]
        logits = model(b_tokens)
            
    logits = logits.detach().cpu().numpy()
    predictions.extend(logits.argmax(1))
    

true_labels = test_dataset["label"]   

In [None]:
ACC = metrics.accuracy_score(true_labels, predictions)
F1 = metrics.f1_score(true_labels, predictions, average='macro')
MCCA = metrics.matthews_corrcoef(true_labels, predictions)

print(round(ACC, 3),";", round(F1, 3), ";", round(MCCA, 3))

In [None]:
txt = ""
for i in range(0, 8):
    l = []
    p = []
    for j in range(0, len(true_labels)):
        if true_labels[j]==i:
            l.append(true_labels[j])
            p.append(predictions[j])
            
    
    acc_c = metrics.accuracy_score(l, p)
    txt+=str(round(acc_c, 3))
    txt+=";"

# Accuracy per class
print(txt)