In [None]:
# download files for sentiment classification
from requests import get

def download(url, filename):
    with open(filename, "wb") as file:
        response = get(url)
        file.write(response.content)

download("https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt", "ratings_train.txt")
download("https://raw.githubusercontent.com/e9t/nsmc/master/ratings_test.txt", "ratings_test.txt")

# print first 5 lines of the file
with open("ratings_train.txt", "r") as file:
    for i in range(5):
        print(file.readline())
        
# build a vocabulary with training data
with open("ratings_train.txt", "r", encoding="utf-8") as file:
    contents = file.read()
    lines = contents.split("\n")[1:]
    train_data = [line.split("\t") for line in lines if len(line) > 0]

with open("ratings_test.txt", "r", encoding="utf-8") as file:
    contents = file.read()
    lines = contents.split("\n")[1:]
    test_data = [line.split("\t") for line in lines if len(line) > 0]

vocab = {"[PAD]":0, "[UNK]":1}
vocab_idx = 2
for data in train_data:
    line = data[1]
    for char in line:
        if char not in vocab:
            vocab[char] = vocab_idx
            vocab_idx += 1

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SentimentClassifier(nn.Module):
    def __init__(self, vocab_size):
        super(SentimentClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, 32)
        self.fc1 = nn.Linear(32 * 100, 100)
        self.fc2 = nn.Linear(100, 2)

    def forward(self, x):
        x = self.embedding(x)
        x = x.view(-1, 32 * 100)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
import lightning as pl

class SentimentClassifierPL(pl.LightningModule):
    def __init__(self, sentiment_classifier):
        super(SentimentClassifierPL, self).__init__()
        self.model = sentiment_classifier
        self.loss = nn.CrossEntropyLoss()
        
        self.validation_step_outputs = []
        self.test_step_outputs = []
        self.save_hyperparameters()
    
    def training_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.model(inputs)
        loss = self.loss(outputs, labels)
        self.log("train_loss", loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.model(inputs)
        loss = self.loss(outputs, labels)
        self.log("val_loss", loss)
        self.validation_step_outputs.append((loss, outputs, labels))
        return loss, outputs, labels
    
    def on_validation_epoch_end(self):
        outputs = self.validation_step_outputs
        avg_loss = torch.stack([x[0] for x in outputs]).mean()
        self.log("avg_val_loss", avg_loss)
        
        all_outputs = torch.cat([x[1] for x in outputs])
        all_labels = torch.cat([x[2] for x in outputs])
        all_preds = all_outputs.argmax(dim=1)
        accuracy = (all_preds == all_labels).float().mean()
        self.log("val_accuracy", accuracy)
        self.validation_step_outputs.clear()
    
    def test_step(self, batch, batch_idx):
        inputs, labels = batch
        outputs = self.model(inputs)
        loss = self.loss(outputs, labels)
        self.log("test_loss", loss)
        self.test_step_outputs.append((loss, outputs, labels))
        return loss, outputs, labels
    
    def on_test_epoch_end(self):
        outputs = self.test_step_outputs
        avg_loss = torch.stack([x[0] for x in outputs]).mean()
        self.log("avg_test_loss", avg_loss)
        
        all_outputs = torch.cat([x[1] for x in outputs])
        all_labels = torch.cat([x[2] for x in outputs])
        all_preds = all_outputs.argmax(dim=1)
        accuracy = (all_preds == all_labels).float().mean()
        self.log("test_accuracy", accuracy)
        self.test_step_outputs.clear()
        
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
        return optimizer

In [None]:
from torch.utils.data import Dataset, DataLoader

class SentimentDataset(Dataset):
    def __init__(self, data, vocab):
        self.data = data
        self.vocab = vocab

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        label = int(self.data[index][2])
        line = self.data[index][1]
        # convert characters to indices with unk token 
        line = [self.vocab.get(char, 1) for char in line]
        
        if len(line) > 100:
            line = line[:100]
        else:
            line = line[:100] + [0] * (100 - len(line))
            
        return torch.tensor(line), torch.tensor(label)
    
train_dataset = SentimentDataset(train_data, vocab)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)

val_dataset = SentimentDataset(test_data, vocab)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=4)

test_dataset = SentimentDataset(test_data, vocab)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)

In [None]:
sentcls = SentimentClassifier(len(vocab))
PLSentimentClassifier = SentimentClassifierPL(sentcls)

In [None]:
import wandb
from lightning.pytorch.loggers import WandbLogger

wandb.login()
wandb_logger = WandbLogger(project="NLP", name="Lec01_sentiment_classification_w_pl")

In [None]:
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint
early_stopping = EarlyStopping(monitor='val_loss', patience=3)
checkpoint = ModelCheckpoint(monitor='val_loss', 
                             dirpath="checkpoints", 
                             filename="sentiment-classifier-{epoch:02d}-{val_loss:.2f}",
                             verbose=True)

In [None]:
trainer = pl.Trainer(max_epochs=3, 
                     accelerator="gpu",
                     callbacks=[early_stopping, checkpoint],
                     logger=wandb_logger
                     ) # see https://lightning.ai/docs/pytorch/stable/common/trainer.html#trainer-class-api

In [None]:
trainer.fit(model=PLSentimentClassifier, 
            train_dataloaders=train_loader,
            val_dataloaders=val_loader)

In [None]:
trainer.test(dataloaders=test_loader)

In [None]:
best_model = SentimentClassifierPL.load_from_checkpoint(".", 
                                                        sentiment_classifier=SentimentClassifier(len(vocab)))
trainer.test(best_model, test_loader)