### Handle Imports

In [30]:
import torch
from tqdm.auto import tqdm
import pandas as pd
import numpy as np
import pprint

### Check if PyTorch recognizes GPU

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

### Read in dataset

In [None]:
columns = [
    'id', 'label', 'claim', 'subject', 'speaker', 'speaker_job_title', 'state_info',
    'party_affiliation', 'barely_true_counts', 'false_counts',
    'half_true_counts', 'mostly_true_counts', 'pants_on_fire_counts', 'context'
]

df_train = pd.read_csv('../data/train.tsv', sep='\t', names=columns).dropna()
df_valid = pd.read_csv('../data/valid.tsv', sep='\t', names=columns).dropna()
df_test = pd.read_csv('../data/test.tsv', sep='\t', names=columns).dropna()
print(df_train.columns)
print(df_train.head(1))

### Tokenize Input Data

In [33]:
def tokenize_liar(samples, labels, tokenizer):
  tokenized = []
  for idx in range(len(samples)):
    tokenized_claim = tokenizer(samples[idx], return_tensors='pt')
    
    n_inst = {
      'claim_token': tokenized_claim,
      'claim_origin': samples[idx],
      'label': labels[idx], 
      'idx': idx
    }
    tokenized.append(n_inst)

  return tokenized

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')

# obtain training samples/label pairs from dataset
liar_train_samples = np.array(df_train['claim'])
liar_train_labels = np.array(df_train['label'])
liar_test_samples = np.array(df_test['claim'])
liar_test_labels = np.array(df_test['label'])

tokenized_train_dataset = tokenize_liar(liar_train_samples, liar_train_labels, tokenizer)
tokenized_test_dataset = tokenize_liar(liar_test_samples, liar_test_labels, tokenizer)
pprint.pprint(tokenized_train_dataset[0])

### Define PyTorch Datasets (Augmented using SMOTE & Unaugmented)

In [35]:
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence
from imblearn.over_sampling import SMOTE

class LiarDataset(Dataset):
    def __init__(self, liar_data):
        self.labels = []
        self.data = []
    
        self.label_map = {
            'pants-fire': 0,
            'false': 1,
            'barely-true': 2,
            'half-true': 3,
            'mostly-true': 4,
            'true': 5
        }

        for idx in range(len(liar_data)):
            self.data.append(liar_data[idx]['claim_token'])
            self.labels.append(self.label_map[liar_data[idx]['label']])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        claim = self.data[idx]
        label = self.labels[idx]

        # adjust code according to received UserWarning:
        # UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() 
        # or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).
        claim = {key: torch.tensor(value, dtype=torch.long) for key, value in claim.items()}
        label = torch.tensor(label, dtype=torch.long)

        return claim, label

### Segment Dataset into training and test portions

In [36]:
from torch.utils.data import random_split
from sklearn.model_selection import train_test_split

X_train = [inst['claim_token'] for inst in tokenized_train_dataset]
y_train = [inst['label'] for inst in tokenized_train_dataset]
X_test = [inst['claim_token'] for inst in tokenized_test_dataset]
y_test = [inst['label'] for inst in tokenized_test_dataset]

# reconstruct dictionaries using training/test sets
train_set = []
for claim_token, label in zip(X_train, y_train):
    train_inst = {
        'claim_token': claim_token,  
        'label': label
    }
    train_set.append(train_inst)

test_set = []
for claim_token, label in zip(X_test, y_test):
    test_inst = {
        'claim_token': claim_token,
        'label': label
    }
    test_set.append(test_inst)

train_dataset = LiarDataset(train_set)
test_dataset = LiarDataset(test_set)

### Define model, dataloaders, loss function, collate function, and optimizer

In [37]:
# need to create collate function to pad variable length sequences for input
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    # as per cell 6 output, item[0] will look like this:
    # 'tweet_token': {'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1]]),
    #                   'input_ids': tensor([[  101, 21887, 23350,  2003, 19345, 13685,  1012,   102]]),
    #                   'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0]])}}
    # item[1] will be a numeric label according to MisinformationDataset's label_map
    input_ids = [item[0]['input_ids'].squeeze(0) for item in batch]
    attention_masks = [item[0]['attention_mask'].squeeze(0) for item in batch]
    labels = [item[1] for item in batch]

    # pad sequences for input_ids and attention_masks with 0 values
    input_ids = pad_sequence(input_ids, batch_first=True, padding_value=0)
    attention_masks = pad_sequence(attention_masks, batch_first=True, padding_value=0)

    labels = torch.tensor(labels, dtype=torch.long)

    return {
        'input_ids': input_ids,
        'attention_mask': attention_masks,
    }, labels

In [None]:
from transformers import AutoModelForSequenceClassification
from torch.utils.data import DataLoader

''' 
https://huggingface.co/digitalepidemiologylab/covid-twitter-bert-v2

CT-BERT: This model was trained on 97M unique tweets (1.2B training examples) 
collected between January 12 and July 5, 2020 containing at least one of the keywords 
"wuhan", "ncov", "coronavirus", "covid", or "sars-cov-2".  
These tweets were filtered and preprocessed to reach a final sample of 22.5M tweets 
(containing 40.7M sentences and 633M tokens) which were used for training.
'''
model = AutoModelForSequenceClassification.from_pretrained('bert-base-cased', num_labels=6)
model = model.to(device)

# attempt to use class weights to offset imbalance of dataset
# pos_count = np.count_nonzero(covid_labels == 'pos')
# neg_count = np.count_nonzero(covid_labels == 'neg')
# na_count = np.count_nonzero(covid_labels == 'na')
# total_count = len(covid_labels)
# pos_weight = total_count / pos_count
# neg_weight = total_count / neg_count
# na_weight = total_count / na_count
# print(f'Weights: \nPos: {pos_weight}\nNeg: {neg_weight}\nNa: {na_weight}')

# class_weights = torch.tensor([1.05*pos_weight, neg_weight, na_weight]).to(device)
# loss_fn = CrossEntropyLoss(weight=class_weights)

# freeze base model layers
for param in model.base_model.parameters():
    param.requires_grad = False

# unfreeze last two layers of base model for fine tuning
#for param in model.base_model.encoder.layer[-2:]:
#    param.requires_grad = True

optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)
#dev_dataloader = DataLoader(dev_set, batch_size=16, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)

### Define training loop

In [39]:
#def train_covid(model, optim, loss_fn, dataloader, epochs):
def train_covid(model, optim, dataloader, epochs):
  for epoch in range(epochs):
      model.train()
      total_loss = 0

      for batch_idx, batch in enumerate(dataloader):
          optim.zero_grad()
          
          # unpack batch of form (tweets, labels)
          claims, labels = batch
          # send tweets dict's values to device
          claims = {key: value.to(device) for key, value in claims.items()}
          labels = labels.to(device)
          
          # forward pass on CT-BERT
          outputs = model(**claims, labels=labels)
          #logits = outputs.logits
          
          # class weighted CrossEntropyLoss
          #loss = loss_fn(logits, labels)
          
          # loss provided by model
          loss = outputs.loss 

          # backwards pass on CT-BERT
          loss.backward()
          optim.step()

          total_loss += loss.item()

          print(f"Epoch {epoch + 1}, Batch {batch_idx + 1}/{len(dataloader)}, Loss: {loss.item()}")

      print(f"Epoch {epoch + 1}, Loss: {total_loss}")

In [None]:
print(model.config.num_labels)

### Train model for sequence classification

In [None]:
epochs = 64
#train_covid(model, optimizer, loss_fn, train_dataloader, epochs)
train_covid(model, optimizer, train_dataloader, epochs)

### Evaluate Model

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def evaluate_model(model, dataloader):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch_idx, batch in enumerate(dataloader):
            tweets, labels = batch
            tweets = {key: value.to(device) for key, value in tweets.items()}
            labels = labels.to(device)
            
            # run sequences through CT-BERT
            outputs = model(**tweets, labels=labels)
            
            # highest energy class is our prediction
            logits = outputs.logits
            preds = torch.argmax(logits, 1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    class_labels = [0, 1, 2, 3, 4, 5]
    per_class_accuracy = {}
    for class_label in class_labels:
        # get indices which match current class_label
        class_indices = np.where(np.array(all_labels) == class_label)[0]
        
        # get predictions of current class label
        class_preds = np.array(all_preds)[class_indices]
        
        # calculate accuracy for current class_label
        correct_class_preds = np.sum(class_preds == class_label)
        total_class_samples = len(class_indices)
        
        per_class_accuracy[class_label] = (correct_class_preds / total_class_samples) * 100
        
    accuracy = 100*accuracy_score(all_labels, all_preds)
    precision = 100*precision_score(all_labels, all_preds, labels=class_labels, average=None, zero_division=0)
    recall = 100*recall_score(all_labels, all_preds, labels=class_labels, average=None, zero_division=0)
    f1 = f1_score(all_labels, all_preds, labels=class_labels, average=None, zero_division=0)

    return accuracy, precision, recall, f1, per_class_accuracy


# evaluate the model on the test set (unaugmented)
accuracy, precision, recall, f1, per_class_accuracy = evaluate_model(model, test_dataloader)

print(f"Test Accuracy: {accuracy:.2f}%")
print(f"Per Class Accuracy: {per_class_accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1: {f1}")

### Save Model Weights

In [43]:
torch.save(model.state_dict(), f'./models/model_weights3-{accuracy:.1f}.pth')