# Dataset

In [None]:
import torch
from torch.utils.data import Dataset

class ABSA_Dataset(Dataset):
    def __init__(self, df, tokenizer):
        self.df = df
        self.tokenizer = tokenizer
    
    def __getitem__(self, idx):
        tokens, tags, pols = self.df.iloc[idx, :3].values
        
        tokens = tokens.replace("'", "").strip("][").split(', ')
        tags = tags.strip('][').split(', ')
        pols = pols.strip('][').split(', ')
        
        bert_tokens = []
        bert_tags = []
        bert_pols = []
        
        for i in range(len(tokens)):
            t = self.tokenizer.tokenize(tokens[i])
            bert_tokens += t
            bert_tags += [int(tags[i])]*len(t)
            bert_pols += [int(pols[i])]*len(t)
            
        bert_ids = self.tokenizer.convert_tokens_to_ids(bert_tokens)
        
        ids_tensor = torch.tensor(bert_ids)
        tags_tensor = torch.tensor(bert_tags)
        pols_tensor = torch.tensor(bert_pols)
        return bert_tokens, ids_tensor, tags_tensor, pols_tensor

    def __len__(self):
        return len(self.df)

In [ ]:
import pandas as pd 

train_df = pd.read_csv('./data/en/restaurants_train.csv')
test_df = pd.read_csv('./data/en/restaurants_test.csv')

In [ ]:
train_df.iloc[0]

In [ ]:
from transformers import BertTokenizer

model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)

In [ ]:
tokenizer.add_special_tokens

In [ ]:
tokenizer.cls_token_id

In [ ]:
train_ds = ABSA_Dataset(train_df, tokenizer)
test_ds = ABSA_Dataset(test_df, tokenizer)

In [ ]:
len(train_ds)
len(test_ds)
next(iter(train_ds))

In [ ]:
from torch.nn.utils.rnn import pad_sequence

def padding(samples):
    ids_tensors = [s[1] for s in samples]
    ids_tensors = pad_sequence(ids_tensors, batch_first=True)

    tags_tensors = [s[2] for s in samples]
    tags_tensors = pad_sequence(tags_tensors, batch_first=True)

    pols_tensors = [s[3] for s in samples]
    pols_tensors = pad_sequence(pols_tensors, batch_first=True)

    masks_tensors = torch.zeros(ids_tensors.shape, dtype=torch.long)
    masks_tensors = masks_tensors.masked_fill(ids_tensors != 0, 1)

    return ids_tensors, tags_tensors, pols_tensors, masks_tensors

In [ ]:
from torch.utils.data import DataLoader

batch_size = 32
train_dataloader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=padding)
test_dataloader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, collate_fn=padding)

In [ ]:
next(iter(train_dataloader))

# Model

In [ ]:
from transformers import BertModel

class ABTE(torch.nn.Module):
    def __init__(self):
        super(ABTE, self).__init__()
        self.bert = BertModel.from_pretrained(model_name)
        self.linear = torch.nn.Linear(self.bert.config.hidden_size, 3)
        self.loss_fn = torch.nn.CrossEntropyLoss()
        
    def forward(self, ids_tensors, masks_tensors, tags_tensors):
        bert_outputs = self.bert(
            input_ids=ids_tensors,
            attention_mask=masks_tensors,
            return_dict=False
        )
        bert_outputs = bert_outputs[0]
        linear_outputs = self.linear(bert_outputs)
        if tags_tensors is not None:
            tags_tensors = tags_tensors.view(-1)
            linear_outputs_ = linear_outputs.view(-1, 3)
            loss = self.loss_fn(linear_outputs_, tags_tensors)
            return loss, linear_outputs
        else:
            return linear_outputs

In [ ]:
model = ABTE(model_name)

In [ ]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [ ]:
model.to(device)

# Train

In [ ]:
import time
import numpy as np
from sklearn.metrics import classification_report

def train_epoch(model, optimizer, train_loader, device):
    losses = []
    for batch in (train_loader):
        ids_tensors, tags_tensors, _, masks_tensors = batch
        ids_tensors = ids_tensors.to(device)
        tags_tensors = tags_tensors.to(device)
        masks_tensors = masks_tensors.to(device)

        loss, _ = model(
            ids_tensors=ids_tensors,
            masks_tensors=masks_tensors,
            tags_tensors=tags_tensors
        )
        losses.append(loss.item())
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    return sum(losses)/len(losses)

def evaluate_epoch(model, valid_loader, device):
    losses = []

    preds, labels = [], []
    with torch.no_grad():
        for batch in (valid_loader):
            ids_tensors, tags_tensors, _, masks_tensors = batch
            ids_tensors = ids_tensors.to(device)
            tags_tensors = tags_tensors.to(device)
            masks_tensors = masks_tensors.to(device)

            loss, outputs = model(
                ids_tensors=ids_tensors,
                masks_tensors=masks_tensors,
                tags_tensors=tags_tensors
            )
            losses.append(loss.item())

            _, p = torch.max(outputs, dim=2)
            preds += list([int(j) for i in p for j in i ])
            labels += list([int(j) for i in tags_tensors for j in i ])

    acc = np.mean(np.array(preds) == np.array(labels))
    return sum(losses)/len(losses), acc

train_losses = []
eval_accs, eval_losses = [], []

def train(model, model_name, save_model, optimizer, train_loader, valid_loader, num_epochs, device):
    best_loss_eval = 100
    times = []
    for epoch in range(1, num_epochs+1):
        epoch_start_time = time.time()
        # Training
        train_loss = train_epoch(model, optimizer, train_loader, device)
        train_losses.append(train_loss)

        # Evaluation
        eval_loss, eval_acc = evaluate_epoch(model, valid_loader, device)
        eval_accs.append(eval_acc)
        eval_losses.append(eval_loss)

        # Save best model
        if eval_loss < best_loss_eval:
            torch.save(model.state_dict(), save_model + f'/{model_name}.pt')

        times.append(time.time() - epoch_start_time)
        # Print loss, acc end epoch
        print("-" * 59)
        print(
            "| End of epoch {:3d} | Time: {:5.2f}s | Train Loss {:8.3f} "
            "| Valid Accuracy {:8.3f} | Valid Loss {:8.3f} ".format(
                epoch, time.time() - epoch_start_time, train_loss, eval_acc, eval_loss
            )
        )
        print("-" * 59)

    # Load best model
    model.load_state_dict(torch.load(save_model + f'/{model_name}.pt'))
    model.eval()
    metrics = {
        'train_loss': train_losses,
        'valid_accuracy': eval_accs,
        'valid_loss': eval_losses,
        'time': times
    }
    return model, metrics

In [ ]:
!mkdir "./model"

In [ ]:
save_model = "./model"
model = ABTE(model_name)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
num_epochs = 50
best_model, metrics = train(
    model, model_name, save_model, optimizer, train_loader, test_loader, num_epochs, device
)

In [ ]:
import matplotlib.pyplot as plt

def plot_training_history(train_losses, eval_losses, eval_accs, num_epochs):
    epochs = range(1, num_epochs + 1)

    # Create a figure for losses
    plt.figure(figsize=(12, 5))

    # Plotting training and validation losses
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, label='Training Loss')
    plt.plot(epochs, eval_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # Plotting validation accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, eval_accs, color='green', label='Validation Accuracy')
    plt.title('Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    # Show the plot
    plt.tight_layout()
    plt.show()

# Call the function to plot the training history
plot_training_history(train_losses, eval_losses, eval_accs, num_epochs)


# Prediction

In [ ]:
def predict(best_model, sentence, device):
    word_pieces = list(tokenizer.tokenize(sentence))
    input_ids = tokenizer.convert_tokens_to_ids(word_pieces)
    input_tensor = torch.tensor([input_ids]).to(device)

    with torch.no_grad():
        outputs = model(input_tensor, None, None)
        _, predictions = torch.max(outputs, dim=2)

    predictions = predictions[0].tolist()
    return word_pieces, predictions, outputs

In [ ]:
sentence = " ".join(test_df.iloc[0]["Tokens"].replace("'", "").strip("][").split(', '))
predict(best_model, sentence, device)