In [None]:
!pip install transformers

In [None]:
import re
import os
import json
import torch
import shutil

import tensorflow as tf
import random
import numpy as np
import pandas as pd
from transformers import AutoTokenizer, TFAutoModelForSeq2SeqLM, DataCollatorForSeq2Seq, create_optimizer
from datasets import load_dataset, concatenate_datasets, Dataset

from sklearn.model_selection import train_test_split

In [None]:
def load_imdb():
  # download dataset
  url = 'https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz'

  dataset = tf.keras.utils.get_file('aclImdb_v1.tar.gz', url,
                                    untar=True, cache_dir='.',
                                    cache_subdir='')

  dataset_dir = os.path.join(os.path.dirname(dataset), 'aclImdb')
  train_dir = os.path.join(dataset_dir, 'train')
  test_dir = os.path.join(dataset_dir, 'test')
  
  # remove irrelevant data
  remove_dir = os.path.join(train_dir, 'unsup')
  shutil.rmtree(remove_dir)

  # load to dataframes
  train_lst, test_lst = [], []
  label2id = {"pos" : "positive", "neg" : "negative"}

  for label in ['pos', 'neg']:
    path = train_dir + "/" + label
    files = os.listdir(path)
    for _file in files:
      with open(os.path.join(path, _file), 'r') as f:
        # strip <br /> tags
        text = f.read()
        train_lst.append([text, label])
    
    path = test_dir + "/" + label
    files = os.listdir(path)
    for _file in files:
      with open(os.path.join(path, _file), 'r') as f:
        text = f.read()
        test_lst.append([text, label2id[label]])
    
  df_train = pd.DataFrame(train_lst, columns=['text', 'label']).sample(frac=1)
  df_test  = pd.DataFrame(test_lst, columns=['text', 'label'])
  x_train, y_train = df_train["text"], df_train["label"]
  x_test, y_test = df_test["text"], df_test["label"]

  return x_train, y_train, x_test, y_test

In [None]:
def load_fin():
    # download dataset
    url = '/kaggle/input/financial-sentiment-analysis/data.csv'

    # load to dataframes
    df_raw = pd.read_csv(url)
    label2id = {"positive" : 2, "neutral" : 1, "negative" : 0}
    # df_raw["Sentiment"] = df_raw["Sentiment"].apply(lambda x : label2id[x])

    df_train, df_test = train_test_split(df_raw)
    df_train = df_train.sample(frac=1)
    x_train, y_train = df_train["Sentence"], df_train["Sentiment"]
    x_test, y_test = df_test["Sentence"], df_test["Sentiment"]

    return x_train, y_train, x_test, y_test

In [None]:
def load_sst5():
    train_url = 'https://raw.githubusercontent.com/christycty/sentiment-analysis-review/main/data/sst5_train.csv'
    test_url = 'https://raw.githubusercontent.com/christycty/sentiment-analysis-review/main/data/sst5_test.csv'
    
    df_train = pd.read_csv(train_url).sample(frac=1)
    df_test = pd.read_csv(test_url)
    
    id2label = {0:"very negative", 1:"negative", 2:"neutral", 3:"positive", 4:"very positive"}
    df_train['label'] = df_train['label'].apply(lambda x : id2label[x])
    df_test['label'] = df_test['label'].apply(lambda x : id2label[x])
    
    x_train, y_train = df_train["sentence"], df_train["label"]
    x_test, y_test = df_test["sentence"], df_test["label"]

    return x_train, y_train, x_test, y_test

In [None]:
def load_sst2():
    train_url = 'https://raw.githubusercontent.com/christycty/sentiment-analysis-review/main/data/sst5_train.csv'
    test_url = 'https://raw.githubusercontent.com/christycty/sentiment-analysis-review/main/data/sst5_test.csv'
    
    df_train = pd.read_csv(train_url).sample(frac=1)
    df_test = pd.read_csv(test_url)
    
    # remove neutral
    df_train = df_train[df_train["label"] != 2]
    df_test = df_test[df_test["label"] != 2]
    
    # map to positive or negative
    label2id = {0:"negative", 1:"negative", 3:"positive", 4:"positive"}
    df_train["label"] = df_train["label"].apply(lambda x : label2id[x])
    df_test["label"] = df_test["label"].apply(lambda x : label2id[x])
    
    x_train, y_train = df_train["sentence"], df_train["label"]
    x_test, y_test = df_test["sentence"], df_test["label"]

    return x_train, y_train, x_test, y_test

In [None]:
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
import re

def preprocess(text_inp, max_words=250):
    TAG_RE = re.compile(r'<[^>]+>')

    text = TAG_RE.sub('', text_inp)
    text = re.sub('[^a-zA-Z]', ' ', text) # non alphabets
    text = re.sub(r'\s+', ' ', text)  # multiple space
    
    # stopwords
    text = text.lower().split()
    stopwords_set = set(stopwords.words('english'))
    text = [x for x in text if x not in stopwords_set]
    # keep 250 words at most
    text = text[:min(len(text), max_words)]
    text = " ".join(text) + "</s>"
    return text

In [None]:
def train(model, x_train, y_train, x_val, y_val, epochs=10):
  # Train the model
  history = model.fit(x_train, y_train, epochs=epochs,
                      validation_data=(x_val, y_val))

  # Evaluate the model on the validation set
  loss, accuracy = model.evaluate(x_val, y_val)
  print(f'Validation loss: {loss:.4f}, Validation accuracy: {accuracy:.4f}')
  return history

In [None]:
def load_data(data, checkpoint):
    if data == "fin":
        x_train_raw, y_train_, x_test_raw, y_test = load_fin()
        num_classes = 3
    elif data == "imdb":
        x_train_raw, y_train_, x_test_raw, y_test = load_imdb()
        num_classes = 2
        steps_per_epoch = 625
        
    elif data == "sst5":
        x_train_raw, y_train_, x_test_raw, y_test = load_sst5()
        num_classes = 5
        
    elif data == "sst2":
        x_train_raw, y_train_, x_test_raw, y_test = load_sst2()
        num_classes = 2
        steps_per_epoch = 173
    
    # preprocess dataset
    x_train = x_train_raw.apply(preprocess)
    x_test = x_test_raw.apply(preprocess)
    train_df = pd.concat([x_train, y_train_], axis=1, keys=['sentence', 'label'])
    test_df = pd.concat([x_test, y_test], axis=1, keys=['sentence', 'label'])
    
    train_ds = Dataset.from_pandas(train_df)
    test_ds = Dataset.from_pandas(test_df)
    
    return train_ds, test_ds

In [None]:
def test_model(data, checkpoint, epochs):
    train_ds, test_ds = load_data(data, checkpoint)
    
    tokenizer = AutoTokenizer.from_pretrained(checkpoint)
    
    def t5_pre(examples):
        model_inputs = tokenizer(examples['sentence'], max_length=128, truncation=True)

        with tokenizer.as_target_tokenizer():
            labels = tokenizer(examples['label'], max_length=128, truncation=True)

        model_inputs['labels'] = labels['input_ids']
        model_inputs['decoder_input_ids'] = np.zeros((len(labels['input_ids']), 0))

        return model_inputs

    train_inp = train_ds.map(t5_pre, batched=True)
    train_inp = train_inp.remove_columns(['sentence', 'label'])
    
    model = TFAutoModelForSeq2SeqLM.from_pretrained(checkpoint)
    data_collator = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model, return_tensors="tf")

    batch_size = 8
    
    tf_train = train_inp.to_tf_dataset(
      columns=["attention_mask", "input_ids", 'decoder_input_ids', 'labels'],
      shuffle=True,
      collate_fn=data_collator,
      batch_size=batch_size,
    )
    
    num_train_steps = len(tf_train) * epochs
    
    optimizer, schedule = create_optimizer(
        init_lr=3e-4,
        num_warmup_steps=0,
        num_train_steps=num_train_steps,
        weight_decay_rate=0.01,
    )
    
    model.compile(
        optimizer=optimizer,
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=tf.metrics.SparseCategoricalAccuracy(),
    )
    
    model.fit(
      tf_train,
      epochs=epochs,
      batch_size=batch_size
    )
    
    test_inp = test_ds.map(t5_pre, batched=True)
    test_inp = test_inp.remove_columns(['sentence', 'label'])
    
    tf_test = test_inp.to_tf_dataset(
        columns=["attention_mask", "input_ids", 'decoder_input_ids', 'labels'],
        collate_fn=data_collator,
        shuffle=False,
        batch_size=batch_size,
    )
    
    model.evaluate(tf_test)
    return model

In [None]:
# test_model(data_name, model_checkpoint (on huggingface), #epochs)
sst2_t5_model = test_model("sst2", "t5-small", 3)

In [None]:
sst5_t5_model = test_model("sst5", "t5-small", 3)

In [None]:
sst2_t5_model = test_model("fin", "t5-small", 3)

In [None]:
imdb_t5_model = test_model("imdb", "t5-small", 5)