In [None]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

import torch
from torch import nn, optim 
from torch.utils.data import Dataset, DataLoader
#from tqdm import tqdm
from tqdm.notebook import tqdm
!pip install transformers
import transformers
from transformers import BertTokenizer, BertModel, BertConfig, AdamW

import sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import precision_score, recall_score, f1_score
import nltk
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('stopwords')
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import string

from google.colab import drive

# torch.manual_seed(50)
# np.random.seed(50)

# New Section

In [None]:
def data_preprocessing():
        drive.mount('/content/gdrive')
        raw_data = pd.read_csv("/content/gdrive/My Drive/data/liberals_vs_conservatives.csv")
        raw_data.head()
        
        # We use the information of title and text to predict political lean
        data = raw_data[['Title', 'Text', 'Political Lean']]
        labels, counts = np.unique(data['Political Lean'], return_counts = True)
        print(labels, counts)

        # preprocess the labels
        label_preprocessor = LabelEncoder()
        labels = torch.tensor(label_preprocessor.fit_transform(data['Political Lean'])).float()
        
        # preprocess the features
        features = data[['Title', 'Text']].fillna('')
        titles = features['Title']
        texts = features['Text']
        texts[texts != ''] = '\n' + texts[texts != '']
        texts = titles + texts

        # analyze the proportion of data to be cut
        lengths = texts.apply(lambda x: len(x))
        plt.hist(lengths, bins = 50)
        print(f"the proportion to be truncated in bert is {np.sum(lengths > 128)/ len(lengths)}.")

        # train val test split
        X_train, X_test, Y_train, Y_test = train_test_split(texts.values, labels, test_size = 0.2, random_state = 50)
        X_val, X_test, Y_val, Y_test = train_test_split(X_test, Y_test, test_size = 0.5, random_state = 50, shuffle = False)
        
        return X_train, Y_train, X_val, Y_val, X_test, Y_test
        
X_train, Y_train, X_val, Y_val, X_test, Y_test = data_preprocessing()


In [None]:
# define the dataset for dataloader
class TokenizedDataset(Dataset):
    def __init__(self, texts, labels, tokenizer):
        super().__init__()
        self.texts = texts
        if type(labels) == torch.Tensor:
            self.labels = labels.float()
        else:
            self.labels = torch.Tensor(labels).float()
        self.tokenizer = tokenizer
    def __len__(self):
        return self.labels.shape[0]
    
    def __getitem__(self, index):
        one_input = self.tokenizer(self.texts[index], padding = "max_length", truncation = True, max_length = 128,
                                return_token_type_ids = True, return_tensors = 'pt')
        for key in one_input:
            one_input[key] = one_input[key].squeeze(dim = 0)
        one_input['labels'] = self.labels[index]
        return one_input
    





In [None]:
# dataset used for DAN
class IndicesDataset(Dataset):
    # can be used directly into the embedding layer
    def __init__(self, texts, labels, extractor):
        super().__init__()
        self.texts = texts
        if type(labels) == torch.Tensor:
            self.labels = labels.float()
        else:
            self.labels = torch.Tensor(labels).float()
        self.extractor = extractor
    def __len__(self):
        return self.labels.shape[0]
    
    def __getitem__(self, index):
        one_input = self.extractor.transform(self.texts[index])
    #    for key in one_input:
    #        one_input[key] = one_input[key].squeeze(dim = 0)
        one_input['labels'] = self.labels[index]
        return one_input

# extractors
class FeatureExtractor(Dataset):
    def __init__(self, remove_stopwords = True, padding = True):
        self.vocab = {}
        # the number of words in the vocabulary
        self.vocab_size = 0
        # maximum length of sentences
        self.max_length = 0
        self.remove_stopwords = remove_stopwords
        self.padding = padding
        
    # lower and lemmatize the words, remove stop words
    def clean(self, text):      
        # tokenize and lower
        words = word_tokenize(text.lower())
        
        word_list = []
        
        lemmatizer = WordNetLemmatizer()
        if self.remove_stopwords:
            for word in words:
                # remove all punctuations
                word = ''.join((char for char in word if char not in string.punctuation))
                # lemmatize
                word = lemmatizer.lemmatize(word)
                # remove stopwords
                if (word not in stopwords.words("english")) and word != '':
                    word_list.append(word)

        else:
            for word in words:
                # remove all punctuations
                word = ''.join((char for char in word if char not in string.punctuation))
                # lemmatize
                word = lemmatizer.lemmatize(word)
                if word != '':
                    word_list.append(word)
        return word_list
    
    # fit the model with training data
    def fit(self, texts):
        
        distinct_words = set()
        for text in texts:
            word_list = self.clean(text)
            distinct_words.update(set(word_list))
            self.max_length = max(self.max_length, len(word_list))
        
        for idx, word in enumerate(list(distinct_words)):
            self.vocab[word] = idx
        self.vocab_size = len(self.vocab)
        
        # add pad token, empty token and unk token
        self.vocab["<unk>"] = self.vocab_size
        self.vocab["<emp>"] = self.vocab_size + 1 # Indicating the feature is empty after feature transformation
        if self.padding:
            self.vocab["<pad>"] = self.vocab_size + 2
        # update the vocabulary size
        self.vocab_size = len(self.vocab)
        
    # transform text to features
    def transform(self, text):
        assert type(text) == str
        word_list = self.clean(text)
        features = []
        for word in word_list:
            index = self.vocab.get(word, self.vocab["<unk>"])
            features.append(index)
        
        # decide whether to pad (force the same length)
        if self.padding: # for DAN case
            # when no words left, add empty token to avoid error
            if len(features) == 0:
                features.append(self.vocab["<emp>"])
            # padding
            if len(features) < self.max_length:
                features += [self.vocab["<pad>"]] * (self.max_length - len(features))
            
            features = torch.Tensor(features).long()
            attention_mask = (features != self.vocab["<pad>"]).long()
            
            inputs = {"input_ids": features , "attention_mask": attention_mask}
        else: # for RNN case
            if len(features) == 0:
                    features.append(self.vocab["<emp>"])
            features = torch.Tensor(features).long()
            
            inputs = {"input_ids": features}
        
        
        return inputs

In [None]:
# Define the model
class BertPretrainedModel(nn.Module):
    def __init__(self, pretrained_model_name, hidden_dim = 20, dropout_rate = 0.2):
        super().__init__()
        self.config = BertConfig(hidden_dropout_prob = dropout_rate, attention_probs_dropout_prob = dropout_rate)
        self.bert = BertModel.from_pretrained(pretrained_model_name)
        self.dropout = nn.Dropout(p = dropout_rate)
        self.linear1 = nn.Linear(self.bert.config.hidden_size, hidden_dim)
        self.batchNorm = nn.BatchNorm1d(hidden_dim) 
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, inputs):
        assert 'attention_mask' in inputs
        assert 'input_ids' in inputs
        # pooler output is the output for each entire sequence
        x = self.bert(input_ids = inputs['input_ids'], attention_mask = inputs['attention_mask']).pooler_output
        x = self.linear1(x)
        x = self.batchNorm(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = x.squeeze(dim = 1)
        prob = self.sigmoid(x)
        
        return prob


# Deep average network
class DANModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, dropout_rate = 0.2):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.linear1 = nn.Linear(embedding_dim, hidden_dim)
        self.batchNorm = nn.BatchNorm1d(hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p = dropout_rate)
        self.linear2 = nn.Linear(hidden_dim, 1)

        
    def forward(self, inputs):
        
        x = self.embedding(inputs["input_ids"])
        x[inputs["attention_mask"]==0] = float('nan')

        x = torch.nanmean(x, dim = -2)
        x = self.linear1(x)
        x = self.batchNorm(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = x.squeeze(dim = 1)
        probs = torch.sigmoid(x)

        return probs

# BiLSTM network
class BiLSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim = 100, hidden_dim = 20, num_layers = 1, dropout_rate = 0.2):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers = num_layers, 
                            batch_first = True, bidirectional = True)
        self.feedforward = nn.Sequential(nn.Dropout(p = dropout_rate),
                                     nn.Linear(hidden_dim*num_layers*2, hidden_dim//2),
                                     nn.ReLU(),
                                     nn.Linear(hidden_dim//2, 1),
                                     nn.Sigmoid())
        

    def forward(self, inputs):
        x = self.embedding(inputs["input_ids"])
        # only final hidden states used as predictor
        _, (final_hidden, final_cell) = self.lstm(x)
        x = torch.transpose(final_hidden, 0, 1)
        x = x.view(x.shape[0], -1)
        probs = self.feedforward(x).squeeze(dim = 1)

        return probs

# biGRU network
class BiGRUModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim = 100, hidden_dim = 20, num_layers = 1, dropout_rate = 0.2):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.gru = nn.GRU(embedding_dim, hidden_dim, num_layers = num_layers, 
                            batch_first = True, bidirectional = True)
        self.feedforward = nn.Sequential(nn.Dropout(p = dropout_rate),
                                     nn.Linear(hidden_dim*num_layers*2, hidden_dim//2),
                                     nn.ReLU(),
                                     nn.Linear(hidden_dim//2, 1),
                                     nn.Sigmoid())
        

    def forward(self, inputs):
        x = self.embedding(inputs["input_ids"])
        # only final hidden states used as predictor
        _, final_hidden = self.gru(x)
        x = torch.transpose(final_hidden, 0, 1)
        x = x.view(x.shape[0], -1)

        probs = self.feedforward(x).squeeze(dim = 1)

        return probs

# multihead self attention model 
# Because it does not perform well, this model is excluded from the project report
class BertAttentionGRUModel(nn.Module):
    def __init__(self, pretrained_model_name, first_hidden_dim = 200, second_hidden_dim = 100, num_heads = 4, 
                 dropout_rate = 0.2, num_layers = 2):
        super().__init__()
        
        self.bert = BertModel.from_pretrained(pretrained_model_name)
        # build a multihead attention
        self.keyLinear = nn.Linear(self.bert.config.hidden_size, first_hidden_dim, bias = False)
        self.queryLinear = nn.Linear(self.bert.config.hidden_size, first_hidden_dim, bias = False)

        self.multiheadAttention = nn.MultiheadAttention(first_hidden_dim, num_heads, dropout = dropout_rate, vdim = self.bert.config.hidden_size, batch_first = True)
#        self.gru = nn.GRU(self.bert.config.hidden_size, second_hidden_dim, batch_first = True, bidirectional = True, num_layers = num_layers)
#        self.gru = nn.GRU(first_hidden_dim, second_hidden_dim, batch_first = True, bidirectional = True)
#        self.linear = nn.Linear(second_hidden_dim * num_layers * 2, 1)
        self.linear = nn.Linear(first_hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, inputs):
        assert 'attention_mask' in inputs
        assert 'input_ids' in inputs
        # we need final hidden layers of bert output at each states
        x = self.bert(input_ids = inputs['input_ids'], attention_mask = inputs['attention_mask']).last_hidden_state

        # multihead attention calculations
        keys = self.keyLinear(x)
        queries = self.queryLinear(x)
        x, _ = self.multiheadAttention(queries, keys, x)

        # pooling layer to choose the highest dimension (serves as nonlinearity)
        x = torch.transpose(x, 1, 2)
        x = nn.functional.max_pool1d(x, x.shape[2])
        x = x.squeeze(dim = 2)
#        _, final_hidden = self.gru(x)
#        x = torch.transpose(final_hidden, 0, 1)
#        x = x.view(x.shape[0], -1) 
        probs = self.sigmoid(self.linear(x)).squeeze(dim = 1)
        
        return probs

In [None]:
# utility function
def average(L, weights):
    assert len(L) == len(weights) and len(L) == 2
    return (L[0] * weights[0] + L[1] * weights[1]) /(weights[0] + weights[1])

# training and evaluation for each epoch
def train_epoch(dataloader, model, criterion, optimizer, epoch, device):
    model.train()

    tqdmBar = tqdm(enumerate(dataloader), total = len(dataloader), 
                   desc = f"Training epoch {epoch:02d}", leave = True)
    # initialize metrics
    metrics_book = {'n': 0, 'loss': 0, 'accuracy': 0.0}
    for _, inputs in tqdmBar:

        # fetch data to gpu
        for key in inputs:
            inputs[key] = inputs[key].to(device, non_blocking = True)

        labels = inputs['labels']

        N = len(labels)
        # zero gradient
        
        optimizer.zero_grad()

        # feedforward
        probs = model(inputs)
        preds = (probs > 0.5).float()
        
        # loss calculation and accuracy calculation
        loss = criterion(probs, labels)
        accuracy = torch.sum(preds == labels) / N

        # backpropagation
        loss.backward()
        
        # update
        optimizer.step()
        
        # book-keeping
        metrics_book['loss'] = average([loss.item(), metrics_book['loss']], weights = [N, metrics_book['n']])
        metrics_book['accuracy'] = average([accuracy.item(), metrics_book['accuracy']], weights = [N, metrics_book['n']])
        metrics_book['n'] += N
        
        # add information on the bar
        tqdmBar.set_postfix(loss = metrics_book['loss'], accuracy = metrics_book['accuracy'])

    return metrics_book
        
def val_epoch(dataloader, model, criterion, epoch, device):
    model.eval()
    with torch.no_grad():
        
        tqdmBar = tqdm(enumerate(dataloader), total = len(dataloader), 
                       desc = f" Evaluation epoch {epoch:02d}", leave = True)
        metrics_book = {'n': 0, 'loss': 0, 'accuracy': 0.0}
        
        for _, inputs in tqdmBar:

            # fetch data to gpu
            for key in inputs:
                inputs[key] = inputs[key].to(device, non_blocking = True)

            # record information
            labels = inputs['labels']
            N = len(labels)
            
            # feedforward
            probs = model(inputs)
            preds = (probs > 0.5).int()
            
            # loss and accuracy calculation
            loss = criterion(probs, labels)
            accuracy = torch.sum(preds == labels) / N
            
            # book-keeping
            metrics_book['loss'] = average([loss.item(), metrics_book['loss']], weights = [N, metrics_book['n']])
            metrics_book['accuracy'] = average([accuracy.item(), metrics_book['accuracy']], weights = [N, metrics_book['n']])
            metrics_book['n'] += N
            
            # add information on the bar
            tqdmBar.set_postfix(loss = metrics_book['loss'], accuracy = metrics_book['accuracy'])

    return metrics_book

def scores(y_true, y_label):
    scores = {}
    scores['precision'] = precision_score(y_true, y_label)
    scores['recall'] = recall_score(y_true, y_label)
    scores['f1_score'] = f1_score(y_true, y_label)
    return scores

def test(test_dataset, model, device):
    model.eval()
    with torch.no_grad():
        tqdmBar = tqdm(enumerate(test_dataset), total = len(test_dataset))
        preds = []
        labels = []
        for _, one_input in tqdmBar:
            for key in one_input:
                one_input[key] = one_input[key].to(device, non_blocking = True).unsqueeze(dim = 0)
            label = one_input['labels'].item()
            
            # feedforward
            prob = model(one_input).item()
            pred = float(int((prob > 0.5)))
            
            # record
            preds.append(pred)
            labels.append(label)
            # calculate all the scores
            score_table = scores(labels, preds)

    return preds, labels, score_table

In [None]:
# Define the backbone function with the best loss being recorded
def train_val(train_dataloader, val_dataloader, val_dataset, model, criterion, optimizer, device, num_epochs = 10):
    
    # initialize the tracker
    metrics_tracker = {'train_loss':[], 'train_accuracy':[],'val_loss':[], 'val_accuracy':[]}
    # fetch model to gpu
    model = model.to(device)
    # main loop
    for epoch in range(1, num_epochs+1):

        train_metrics = train_epoch(train_dataloader, model, criterion, optimizer, epoch, device)
        val_metrics = val_epoch(val_dataloader, model, criterion, epoch, device)
        
        # update the tracker
        metrics_tracker['train_loss'].append(train_metrics['loss'])
        metrics_tracker['train_accuracy'].append(train_metrics['accuracy'])
        metrics_tracker['val_loss'].append(val_metrics['loss'])
        metrics_tracker['val_accuracy'].append(val_metrics['accuracy'])
    
    # visualize the performance curve
    
    # loss curve
    fig1, ax1 = plt.subplots(figsize = (12, 6))
    ax1.set_title('Loss Curve')
    ax1.set_xlabel('number of epochs')
    ax1.set_ylabel('loss')
    ax1.plot(metrics_tracker['train_loss'], label = 'train', color = 'red')
    ax1.plot(metrics_tracker['val_loss'], label = 'val', color = 'magenta')
    ax1.grid()
    ax1.legend()
    plt.savefig("loss_curve.png")
    
    # accuracy curve
    fig2, ax2 = plt.subplots(figsize = (12, 6))
    ax2.set_title('Accuracy Curve')
    ax2.set_xlabel('number of epochs')
    ax2.set_ylabel('accuracy')
    ax2.plot(metrics_tracker['train_accuracy'], label = 'train', color = 'red')
    ax2.plot(metrics_tracker['val_accuracy'], label = 'val', color = 'magenta')
    ax2.grid()
    ax2.legend()
    plt.savefig("accuracy_curve.png")
    
    # obtain precision, recall, f1 scores for the final validation

    accuracy_final = metrics_tracker['val_accuracy'][-1]
    _,_,score_dict = test(val_dataset, model, device)
    score_dict['accuracy'] = accuracy_final
    return score_dict

In [None]:
X_train, Y_train, X_val, Y_val, X_test, Y_test = data_preprocessing()
# main script
def run(X_train, Y_train, X_val, Y_val, X_test, Y_test, return_model_for_test = False, model_type = 'dan', learning_rate = 0.005, 
        num_epochs = 20, embedding_dim = 200, hidden_dim = 20, dropout_rate = 0.2, weight_decay = 0.01,
        num_layers = 2, first_hidden_dim = 100, second_hidden_dim = 50, num_heads = 4):
    # device detect
    if torch.cuda.is_available():
        device = torch.device('cuda')
    else:
        device = torch.device('cpu')
    # hyperparameters irrelavent to the class
    # model_type = 'dan'
    # learning_rate = 0.005
    # num_epochs = 20
    # weight_decay = 0.01
    
    if model_type == 'bert':
        # hidden_dim = 50
        # dropout_rate = 0.2
        # create the model
        model_name = 'bert-base-uncased'
        model = BertPretrainedModel(model_name, hidden_dim = hidden_dim, dropout_rate = dropout_rate)
        # create the tokenizer
        bert_tokenizer  = BertTokenizer.from_pretrained(model_name)
        
        
        # create the dataset
        train_dataset = TokenizedDataset(X_train, Y_train, bert_tokenizer)
        val_dataset = TokenizedDataset(X_val, Y_val, bert_tokenizer)
        test_dataset = TokenizedDataset(X_test, Y_test, bert_tokenizer)
        
        # hidden_dim = 50
        # dropout_rate = 0.2
        # create the model
        model_name = 'bert-base-uncased'
        model = BertPretrainedModel(model_name, hidden_dim = hidden_dim, dropout_rate = dropout_rate)
        # create the tokenizer
        bert_tokenizer  = BertTokenizer.from_pretrained(model_name)
        
        
        # create the dataset
        train_dataset = TokenizedDataset(X_train, Y_train, bert_tokenizer)
        val_dataset = TokenizedDataset(X_val, Y_val, bert_tokenizer)
        test_dataset = TokenizedDataset(X_test, Y_test, bert_tokenizer)

    elif model_type == 'dan':
        # create the feature extractor
        feature_extractor = FeatureExtractor()
        feature_extractor.fit(X_train)
        
        # create the model
        # embedding_dim = 200
        # hidden_dim = 50
        # dropout_rate = 0.2
        model = DANModel(feature_extractor.vocab_size, embedding_dim, hidden_dim, dropout_rate = dropout_rate)
        # create the dataset
        train_dataset = IndicesDataset(X_train, Y_train, feature_extractor)
        val_dataset = IndicesDataset(X_val, Y_val, feature_extractor)
        test_dataset = IndicesDataset(X_test, Y_test, feature_extractor)

    elif model_type == 'lstm':
            
            # create the feature extractor
            feature_extractor = FeatureExtractor(padding = False, remove_stopwords = False)
            feature_extractor.fit(X_train)
            
            # create the model
            
            # embedding_dim = 100
            # hidden_dim = 20
            # num_layers = 2
            # dropout_rate = 0.2
            model = BiLSTMModel(feature_extractor.vocab_size, embedding_dim, hidden_dim, num_layers, dropout_rate = dropout_rate)
            # create the dataset
            train_dataset = IndicesDataset(X_train, Y_train, feature_extractor)
            val_dataset = IndicesDataset(X_val, Y_val, feature_extractor)
            test_dataset = IndicesDataset(X_test, Y_test, feature_extractor)
    
    elif model_type == 'gru':
            
        # create the feature extractor
        feature_extractor = FeatureExtractor(padding = False, remove_stopwords = False)
        feature_extractor.fit(X_train)
        
        # create the model
        
        # embedding_dim = 100
        # hidden_dim = 20
        # num_layers = 2
        # dropout_rate = 0.2
        model = BiGRUModel(feature_extractor.vocab_size, embedding_dim, hidden_dim, num_layers, dropout_rate = dropout_rate)
        # create the dataset
        train_dataset = IndicesDataset(X_train, Y_train, feature_extractor)
        val_dataset = IndicesDataset(X_val, Y_val, feature_extractor)
        test_dataset = IndicesDataset(X_test, Y_test, feature_extractor)
    
    elif model_type == 'bert_attention_gru': 
        # first_hidden_dim = 100
        # second_hidden_dim = 50
        # num_heads = 4
        # num_layers = 4
        # dropout_rate = 0.2
        # create the model
        model_name = 'bert-base-uncased'
        model = BertAttentionGRUModel(model_name, first_hidden_dim, second_hidden_dim, num_heads, 
                     dropout_rate = dropout_rate, num_layers = num_layers)
        # create the tokenizer
        bert_tokenizer  = BertTokenizer.from_pretrained(model_name)
        
        
        # create the dataset
        train_dataset = TokenizedDataset(X_train, Y_train, bert_tokenizer)
        val_dataset = TokenizedDataset(X_val, Y_val, bert_tokenizer)
        test_dataset = TokenizedDataset(X_test, Y_test, bert_tokenizer)
    
    
    # from dataloader to model input    
    if model_type in ['lstm', 'gru', 'bert_attention_gru']:
        batch_size = 1
    else:
        batch_size = 16
    train_dataloader = DataLoader(train_dataset, shuffle = True, batch_size = batch_size)
    val_dataloader = DataLoader(val_dataset, shuffle = False, batch_size = batch_size)

    # create criterion and optimizer
    criterion = nn.BCELoss()
    optimizer = optim.AdamW(model.parameters(), lr = learning_rate, weight_decay = weight_decay)

    # training and evaluation

    val_scores = train_val(train_dataloader, val_dataloader, val_dataset, model, criterion, optimizer, device = device, num_epochs = num_epochs)

    if return_model_for_test == True:
        preds, labels, score_table = test(test_dataset, model, device)
        preds = np.array(preds)
        labels = np.array(labels)
        X_test = np.array(X_test)
        return X_test, labels, preds, score_table, model
    else:
        return val_scores




In [None]:
# validation for lstm
embedding_dim = [100, 200, 500, 1000, 200, 1000]
hidden_dim = [10, 30, 50, 100, 30, 100]
dropout_rate = [0.2, 0.2, 0.3, 0.5, 0.5, 0.2]
num_layers = [1,2,1,2,3,2]
result_lstm = {"embedding_dim":[], "hidden_dim":[], "dropout_rate":[], "num_lstm_layers":[], "accuracy":[], "precision":[] ,"recall":[], "f1_score":[]}
for e, h, p, l in zip(embedding_dim, hidden_dim, dropout_rate, num_layers):
    score_dict = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, model_type = 'lstm', num_layers = l, embedding_dim = e, hidden_dim = h, dropout_rate = p)
    result_lstm["embedding_dim"].append(e)
    result_lstm["hidden_dim"].append(h)
    result_lstm["dropout_rate"].append(p)
    result_lstm["num_lstm_layers"].append(l)
    for key in score_dict:
        result_lstm[key].append(score_dict[key])
        
pd.DataFrame(result_lstm).to_csv('/content/gdrive/My Drive/lstm_result.csv', index = False, header=True)



In [None]:

# validation for dan
embedding_dim = [100, 200, 500, 1000, 200, 1000]
hidden_dim = [10, 30, 50, 100, 30, 100]
dropout_rate = [0.2, 0.2, 0.3, 0.5, 0.5, 0.2]
result_dan= {"embedding_dim":[], "hidden_dim":[], "dropout_rate":[], "accuracy":[], "precision":[] ,"recall":[], "f1_score":[]}
for e, h, p in zip(embedding_dim, hidden_dim, dropout_rate):
    score_dict = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, model_type = 'dan', embedding_dim = e, hidden_dim = h, dropout_rate = p)
    result_dan["embedding_dim"].append(e)
    result_dan["hidden_dim"].append(h)
    result_dan["dropout_rate"].append(p)
    for key in score_dict:
        result_dan[key].append(score_dict[key])
pd.DataFrame(result_dan).to_csv('/content/gdrive/My Drive/dan_result.csv', index = False, header=True)

In [None]:
# validation for bert
num_epochs = [5, 5, 5, 5, 5, 5]
dropout_rate = [0.1, 0.4, 0.2, 0.4, 0.2, 0.4]
weight_decay = [0.01, 0.02, 0.03, 0.05, 0.1, 0.1]
result_bert= {"num_epochs":[], "dropout_rate":[], "weight_decay":[], "accuracy":[], "precision":[] ,"recall":[], "f1_score":[]}
for n, p, w in zip(num_epochs, dropout_rate, weight_decay):
    score_dict = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, model_type = 'bert', num_epochs = n, dropout_rate = p, learning_rate = 3e-5)
    result_bert["num_epochs"].append(n)
    result_bert["dropout_rate"].append(p)
    result_bert["weight_decay"].append(w)
    for key in score_dict:
        result_bert[key].append(score_dict[key])
pd.DataFrame(result_bert).to_csv('/content/gdrive/My Drive/bert_result_weightdecay.csv', index = False, header=True)

In [None]:
# validation for gru
embedding_dim = [100, 200, 1000, 200, 1000]
hidden_dim = [10, 30, 50, 30, 100]
dropout_rate = [0.2, 0.2, 0.5, 0.5, 0.2]
num_layers = [1,2,1,2,1,2]
result_gru = {"embedding_dim":[], "hidden_dim":[], "dropout_rate":[], "num_gru_layers":[], "accuracy":[], "precision":[] ,"recall":[], "f1_score":[]}
for e, h, p, l in zip(embedding_dim, hidden_dim, dropout_rate, num_layers):
    score_dict = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, model_type = 'gru', num_layers = l, embedding_dim = e, hidden_dim = h, dropout_rate = p, learning_rate = 0.005)
    result_gru["embedding_dim"].append(e)
    result_gru["hidden_dim"].append(h)
    result_gru["dropout_rate"].append(p)
    result_gru["num_gru_layers"].append(l)
    for key in score_dict:
        result_gru[key].append(score_dict[key])
        
pd.DataFrame(result_gru).to_csv('/content/gdrive/My Drive/gru_result.csv', index = False, header=True)


In [None]:
# Note: Because the customed model does not perform well, this part is excluded
# from the report
# validation for bert_attention_gru model
first_hidden_dim = [100, 150, 200, 250, 300]
second_hidden_dim = [10, 100, 50, 50, 30]
num_heads = [5,15,50,10,30]
dropout_rate = [0.0, 0.0, 0.0, 0.0, 0.0]

result_bert_attention_gru = {"first_hidden_dim":[], "second_hidden_dim":[], "num_heads" : [],"dropout_rate":[], "accuracy":[], "precision":[] ,"recall":[], "f1_score":[]}
for f, s, n, d in zip(first_hidden_dim, second_hidden_dim, num_heads, dropout_rate):
    score_dict = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, model_type = 'bert_attention_gru', first_hidden_dim = 150, num_epochs = 5,
                     second_hidden_dim = 100, num_heads = 15, dropout_rate = 0.0, learning_rate = 3e-5)
    result_bert_attention_gru["first_hidden_dim"].append(f)
    result_bert_attention_gru["second_hidden_dim"].append(s)
    result_bert_attention_gru["dropout_rate"].append(d)
    result_bert_attention_gru["num_heads"].append(n)
    for key in score_dict:
        result_bert_attention_gru[key].append(score_dict[key])
        
pd.DataFrame(result_bert_attention_gru).to_csv('/content/gdrive/My Drive/bert_attention_gru_result.csv', index = False, header=True)


In [None]:
## examples for test scripts
# test script using the best F1 score obtained by each type of model
# X_test, labels, preds, score_table, model = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, return_model_for_test = True, model_type = 'bert', learning_rate = 0.005, 
#         num_epochs = 20, embedding_dim = 200, hidden_dim = 20, dropout_rate = 0.2, weight_decay = 0.01,
#         num_layers = 2, first_hidden_dim = 100, second_hidden_dim = 50, num_heads = 4)
# cm = confusion_matrix(labels, preds, labels = [0, 1])
# disp = ConfusionMatrixDisplay(confusion_matrix = cm, display_labels = ['conservatives','liberals'])
# disp.plot()
# X_wrong = X_test[labels != preds]
# Y_wrong = Y_test[labels != preds]
# nwrong = len(X_test[labels != preds])
# mask_wrong = np.random.choice(nwrong, size = 10, replace = False)
# print(X_wrong[mask_wrong])
# print(Y_wrong[mask_wrong])
# print(score_table)

In [None]:
#Bert test (the best model after regularization)
X_test, labels, preds, score_table, model_bert = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, return_model_for_test = True,
        model_type = 'bert', learning_rate = 3e-5, num_epochs = 5, dropout_rate = 0.2, weight_decay = 0.03)
cm = confusion_matrix(labels, preds, labels = [0, 1])
disp = ConfusionMatrixDisplay(confusion_matrix = cm, display_labels = ['conservatives','liberals'])
disp.plot()
X_wrong = X_test[labels != preds]
Y_wrong = Y_test[labels != preds]
nwrong = len(X_test[labels != preds])
mask_wrong = np.random.choice(nwrong, size = 10, replace = False)
print(X_wrong[mask_wrong])
print(Y_wrong[mask_wrong])
print(score_table)

In [None]:
# test for dan
X_test_, labels_, preds_, score_table_, model_ = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, return_model_for_test = True,
        model_type = 'dan', learning_rate = 0.005, num_epochs = 20, dropout_rate = 0.2, weight_decay = 0.01, embedding_dim = 100, hidden_dim = 10)
cm_ = confusion_matrix(labels_, preds_, labels = [0, 1])
disp_ = ConfusionMatrixDisplay(confusion_matrix = cm_, display_labels = ['conservatives','liberals'])
disp_.plot()
print(score_table_)

In [None]:
# test for lstm
X_test_, labels_, preds_, score_table_, model_ = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, return_model_for_test = True, num_layers = 2,
        model_type = 'lstm', learning_rate = 0.005, num_epochs = 20, dropout_rate = 0.5, weight_decay = 0.01, embedding_dim = 1000, hidden_dim = 100)
cm_ = confusion_matrix(labels_, preds_, labels = [0, 1])
disp_ = ConfusionMatrixDisplay(confusion_matrix = cm_, display_labels = ['conservatives','liberals'])
disp_.plot()
print(score_table_)

In [None]:
# test for gru
X_test_, labels_, preds_, score_table_, model_ = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, return_model_for_test = True, num_layers = 2,
        model_type = 'gru', learning_rate = 0.005, num_epochs = 20, dropout_rate = 0.5, weight_decay = 0.01, embedding_dim = 200, hidden_dim = 30)
cm_ = confusion_matrix(labels_, preds_, labels = [0, 1])
disp_ = ConfusionMatrixDisplay(confusion_matrix = cm_, display_labels = ['conservatives','liberals'])
disp_.plot()
print(score_table_)


In [None]:
#Bert test 
X_test, labels, preds, score_table, model_bert = run(X_train, Y_train, X_val, Y_val, X_test, Y_test, return_model_for_test = True,
        model_type = 'bert', learning_rate = 3e-5, num_epochs = 5, dropout_rate = 0.1, weight_decay = 0.01)
cm = confusion_matrix(labels, preds, labels = [0, 1])
disp = ConfusionMatrixDisplay(confusion_matrix = cm, display_labels = ['conservatives','liberals'])
disp.plot()
X_wrong = X_test[labels != preds]
Y_wrong = Y_test[labels != preds]
nwrong = len(X_test[labels != preds])
mask_wrong = np.random.choice(nwrong, size = 10, replace = False)
print(X_wrong[mask_wrong])
print(Y_wrong[mask_wrong])
print(score_table)