In [None]:
!pip install transformers
!pip install pytorch_lightning

In [None]:
import os
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from torch import nn
from torch.optim import AdamW
import torchmetrics

import pytorch_lightning as pl
from pytorch_lightning import Trainer

from typing import Optional

from transformers import ElectraModel, ElectraPreTrainedModel, ElectraTokenizerFast as ElectraTokenizer
from transformers.models.electra.modeling_electra import ElectraClassificationHead

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

pl.seed_everything(42)

Global seed set to 42


42

In [None]:
df = pd.read_csv('/content/drive/MyDrive/isarcasm/isarcasm_datasets/Train_Dataset.csv')[['tweet', 'sarcastic']]
df = df[df['tweet'].notna()]

In [None]:
MODEL_NAME = "google/electra-base-discriminator"

tokenizer = ElectraTokenizer.from_pretrained(MODEL_NAME)

In [None]:
class TweetDataset(Dataset):
    def __init__(self, data:pd.DataFrame, tokenizer: ElectraTokenizer):
        self.data = data
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        encoding = self.tokenizer(
            row.tweet,
            max_length = 64,
            truncation = True,
            padding = "max_length",
            add_special_tokens = True,
            return_token_type_ids = False,
            return_attention_mask = True,
            return_tensors = "pt"
        )

        return {
            "input_ids": encoding["input_ids"].flatten(),
            "attention_mask": encoding["attention_mask"].flatten(),
            "label": torch.tensor(row.sarcastic)
        }

In [None]:
ds = TweetDataset(df, tokenizer)

print(len(ds))

3467


In [None]:
for item in ds:
    print(item["input_ids"][:10])
    print(item["label"])
    break

tensor([ 101, 1996, 2069, 2518, 1045, 2288, 2013, 2267, 2003, 1037])
tensor(1)


In [None]:
class TweetDataModule(pl.LightningDataModule):
    def __init__(self, data:pd.DataFrame, tokenizer: ElectraTokenizer, batch_size: int):
        self.data = data
        self.tokenizer = tokenizer
        self.batch_size = batch_size

        self.setup()
    
    def setup(self, stage: Optional[str] = None):
        # self.train_df, test_df = train_test_split(self.data, test_size=0.2)
        # self.val_df, self.test_df = train_test_split(test_df, test_size=0.5)

        ### add for loading 
        train_df = pd.read_csv('/content/drive/MyDrive/isarcasm/isarcasm_datasets/Train_Dataset.csv')[['tweet', 'sarcastic']]
        train_df = train_df[train_df['tweet'].notna()]
        self.train_df = train_df.sample(frac=1, random_state=42).reset_index(drop=True)

        test_df = pd.read_csv('/content/drive/MyDrive/isarcasm/isarcasm_datasets/Test_Dataset.csv')[['tweet', 'sarcastic']]
        test_df = test_df[test_df['tweet'].notna()]
        self.test_df = test_df

        ###

        self.train_df, self.val_df = train_test_split(self.train_df, test_size=0.1)
    
    def train_dataloader(self):
        return DataLoader(
            dataset = TweetDataset(self.train_df, self.tokenizer),
            batch_size = self.batch_size,
            num_workers = os.cpu_count(),
            shuffle = True
        )

    def val_dataloader(self):
        return DataLoader(
            dataset = TweetDataset(self.val_df, self.tokenizer),
            batch_size = self.batch_size,
            num_workers = os.cpu_count(),
            shuffle = False
        )  

    def test_dataloader(self):
        return DataLoader(
            dataset = TweetDataset(self.test_df, self.tokenizer),
            batch_size = self.batch_size,
            num_workers = os.cpu_count(),
            shuffle = False
        )

In [None]:
data_module = TweetDataModule(df, tokenizer, batch_size = 32)

In [None]:
for batch in data_module.train_dataloader():
    print(len(batch['input_ids']))
    print(batch)
    break

32
{'input_ids': tensor([[  101,  1996,  2245,  ...,     0,     0,     0],
        [  101,  6992,  1998,  ...,     0,     0,     0],
        [  101,  1030, 14855,  ...,     0,     0,     0],
        ...,
        [  101,  1045,  2293,  ...,  2028,  2198,   102],
        [  101,  2043,  1045,  ...,     0,     0,     0],
        [  101,  2026,  2047,  ...,     0,     0,     0]]), 'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        ...,
        [1, 1, 1,  ..., 1, 1, 1],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]]), 'label': tensor([0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        1, 1, 0, 0, 0, 0, 0, 0])}


In [None]:
class ElectraClassifier(ElectraPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.n_classes = config.num_labels
        self.config = config
        self.electra = ElectraModel(config)
        self.classifier = ElectraClassificationHead(config)

        self.post_init()

    def forward(self, input_ids = None, attention_mask = None):
        discriminator_hidden_states = self.electra(input_ids, attention_mask)
        sequence_output = discriminator_hidden_states[0]
        logits = self.classifier(sequence_output)
        return logits

In [None]:
class SarcasmClassifier(pl.LightningModule):
    def __init__(self, n_classes):
        super().__init__()
        self.n_classes = n_classes

        self.classifier = ElectraClassifier.from_pretrained("google/electra-small-discriminator", num_labels = n_classes)

        class_weights = torch.FloatTensor([1, 3]).cuda()
        self.criterion = nn.CrossEntropyLoss(weight=class_weights)

        self.logits = None
        self.preds = []
    
    def forward(self, input_ids, attention_mask):
        return self.classifier(input_ids, attention_mask)
    
    def run_step(self, batch, stage):
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        labels = batch["label"].long()
        logits = self(input_ids, attention_mask)

        self.logits = logits

        loss = self.criterion(logits, labels)

        # accuracy = torchmetrics.Accuracy()(logits, labels)

        self.log(f"{stage}_loss", loss, on_step=True, on_epoch=True, prog_bar=True)

        return loss

    def training_step(self, batch, batch_idx):
        return self.run_step(batch, "train")
    
    def validation_step(self, batch, batch_idx):
        return self.run_step(batch, "val")
    
    def test_step(self, batch, batch_idx):
        r = self.run_step(batch, "test")
        self.preds += list(self.logits.cpu().data.numpy().argmax(axis=1))
        return r

    def configure_optimizers(self):
        return AdamW(self.parameters(), lr=1e-4)

In [None]:
clf = SarcasmClassifier(2)
trainer = Trainer(max_epochs=20, gpus=1, accelerator="gpu", log_every_n_steps=1)

trainer.fit(clf, data_module.train_dataloader(), data_module.val_dataloader())

Some weights of the model checkpoint at google/electra-small-discriminator were not used when initializing ElectraClassifier: ['discriminator_predictions.dense.bias', 'discriminator_predictions.dense_prediction.bias', 'discriminator_predictions.dense.weight', 'discriminator_predictions.dense_prediction.weight']
- This IS expected if you are initializing ElectraClassifier from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing ElectraClassifier from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of ElectraClassifier were not initialized from the model checkpoint at google/electra-small-discriminator and are newly initialized: ['classifier.dense.weight', 'classifier.dense.bias', 'classifier.out_pro

Validation sanity check: 0it [00:00, ?it/s]

Global seed set to 42


Training: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

In [None]:
trainer.test(clf, data_module.test_dataloader())

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'test_loss': 0.6759237051010132, 'test_loss_epoch': 0.6759237051010132}
--------------------------------------------------------------------------------


[{'test_loss': 0.6759237051010132, 'test_loss_epoch': 0.6759237051010132}]

In [None]:
y_pred = clf.preds

y_test = []
for i in data_module.test_dataloader().dataset:
    y_test.append(i['label'].data.numpy())

f1_score(y_test, y_pred)


0.0

In [None]:
!rm -rf lightning_logs
 