In [None]:
!pip install lime

In [None]:
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from lime.lime_text import LimeTextExplainer
from operator import itemgetter
from tqdm import tqdm

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
def load_data(data_file):
  # read csv file
  df = pd.read_csv(data_file)

  # replace nan(no value) comment with ""(empty string)
  df.fillna("", inplace=True)

  tweets = df['Tweet'].tolist()
  labels = df['Party'].tolist()

  labels = [0 if label == "Democrat" else 1 for label in labels]

  return tweets, labels

In [None]:
tweets, labels = load_data('/content/drive/MyDrive/Datasets/Tweets/ExtractedTweets.csv')

In [None]:
len(tweets)

In [None]:
new_tweets = tweets[73098:86460]
new_labels = labels[73098:86460]

n = int(len(new_tweets) / 17)
x = [tweets[i:i + n] for i in range(0, len(new_tweets), n)]
y = [labels[i:i + n] for i in range(0, len(new_labels), n)]

first_half_tweets = x[0]
second_half_tweets = x[1]
third_half_tweets = x[2]
fourth_half_tweets = x[3]
fifth_half_tweets = x[4]
sixth_half_tweets = x[5]
seventh_half_tweets = x[6]
eighth_half_tweets = x[7]
ninth_half_tweets = x[8]
tenth_half_tweets = x[9]
eleventh_half_tweets = x[10]
twelveth_half_tweets = x[11]
thirteenth_half_tweets = x[12]
fourteenth_half_tweets = x[13]
fifteen_half_tweets = x[14]
sixteen_half_tweets = x[15]
seventeen_half_tweets = x[16]
# eighteen_half_tweets = x[17]
# nineteen_half_tweets = x[18]
# twenty_half_tweets = x[19]
# twentyone_half_tweets = x[20]
# twentytwo_half_tweets = x[21]
# twentythree_half_tweets = x[22]
# twentyfour_half_tweets = x[23]
# twentyfive_half_tweets = x[24]
# twentysix_half_tweets = x[25]
# twentyseven_half_tweets = x[26]
# twentyeight_half_tweets = x[27]
# twentynine_half_tweets = x[28]
# thirty_half_tweets = x[29]
# thirtyone_half_tweets = x[30]
# thirtytwo_half_tweets = x[31]
# thirtythree_half_tweets = x[32]
# thirtyfour_half_tweets = x[33]
# thirtyfive_half_tweets = x[34]
# thirtysix_half_tweets = x[35]
# thirtyseven_half_tweets = x[36]
# thirtyeight_half_tweets = x[37]
# thirtynine_half_tweets = x[38]
# fourty_half_tweets = x[39]
# fourtyone_half_tweets = x[40]
# fourtytwo_half_tweets = x[41]
# fourtythree_half_tweets = x[42]
# fourtyfour_half_tweets = x[43]





first_half_labels = y[0]
second_half_labels = y[1]
third_half_labels = y[2]
fourth_half_labels = y[3]
fifth_half_labels = x[4]
sixth_half_labels = x[5]
seventh_half_labels = x[6]
eighth_half_labels = x[7]
ninth_half_labels = x[8]
tenth_half_labels = x[9]
eleventh_half_labels = x[10]
twelveth_half_labels = x[11]
thirteenth_half_labels = x[12]
fourteenth_half_labels = x[13]
fifteen_half_labels = x[14]
sixteen_half_labels = x[15]
seventeen_half_labels = x[16]
# eighteen_half_labels = x[17]
# nineteen_half_labels = x[18]
# twenty_half_labels = x[19]
# twentyone_half_labels = x[20]
# twentytwo_half_labels = x[21]
# twentythree_half_labels = x[22]
# twentyfour_half_labels = x[23]
# twentyfive_half_labels = x[24]
# twentysix_half_labels = x[25]
# twentyseven_half_labels = x[26]
# twentyeight_half_labels = x[27]
# twentynine_half_labels = x[28]
# thirty_half_labels = x[29]
# thirtyone_half_labels = x[30]
# thirtytwo_half_labels = x[31]
# thirtythree_half_labels = x[32]
# thirtyfour_half_labels = x[33]
# thirtyfive_half_labels = x[34]
# thirtysix_half_labels = x[35]
# thirtyseven_half_labels = x[36]
# thirtyeight_half_labels = x[37]
# thirtynine_half_labels = x[38]
# fourty_half_labels = x[39]
# fourtyone_half_labels = x[40]
# fourtytwo_half_labels = x[41]
# fourtythree_half_labels = x[42]
# fourtyfour_half_labels = x[43]

In [None]:
class TweetDataset(Dataset):
    def __init__(self, tweets, labels, tokenizer, max_length):
        self.tweets = tweets
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.tweets)

    def __getitem__(self, idx):
        tweet = str(self.tweets[idx])
        label = self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            tweet,
            add_special_tokens=True,
            max_length=self.max_length,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'tweet_text': tweet,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }


In [None]:
def create_data_loader(tweets, labels, tokenizer, max_length, batch_size):
    dataset = TweetDataset(tweets, labels, tokenizer, max_length)
    return DataLoader(dataset, batch_size=batch_size, num_workers=4)

In [None]:
output_dir = '/content/drive/MyDrive/Datasets/Tweets/Saved Model/'

In [None]:
# Load the model and tokenizer
model = BertForSequenceClassification.from_pretrained(output_dir)
tokenizer = BertTokenizer.from_pretrained(output_dir)

# Move the model to the GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

print('Model and tokenizer loaded')


In [None]:
def evaluate_model(model, data_loader, device):
    model = model.eval()
    tweets = []
    predictions = []
    true_labels = []

    with torch.no_grad():
        for d in data_loader:
            input_ids = d["input_ids"].to(device)
            attention_mask = d["attention_mask"].to(device)
            labels = d["label"].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask
            )

            _, preds = torch.max(outputs.logits, dim=1)

            tweets.extend(d["tweet_text"])
            predictions.extend(preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    return tweets, predictions, true_labels

def calculate_metrics(true_labels, predictions):
    accuracy = accuracy_score(true_labels, predictions)
    f1 = f1_score(true_labels, predictions, average='weighted')
    return accuracy, f1


In [None]:
BATCH_SIZE = 4
MAX_LENGTH = 128

In [None]:
val_data_loader = create_data_loader(tweets, labels, tokenizer, MAX_LENGTH, BATCH_SIZE)

In [None]:
# # Evaluate model
# tweets, predictions, true_labels = evaluate_model(model, val_data_loader, device)

# # Calculate metrics
# accuracy, f1 = calculate_metrics(true_labels, predictions)
# print(f'Accuracy: {accuracy}')
# print(f'F1 Score: {f1}')

In [None]:
torch.cuda.empty_cache()

from torch.cuda.amp import autocast, GradScaler

# Use autocast and GradScaler for mixed precision
scaler = GradScaler()

In [None]:
# Define the prediction function
def predict_proba(texts):
    model.eval()
    inputs = tokenizer(texts, return_tensors='pt', padding=True, truncation=True, max_length=MAX_LENGTH).to(device)

    # Use autocast for mixed precision
    with torch.no_grad():
        with autocast():
            outputs = model(**inputs)
            probs = torch.nn.functional.softmax(outputs.logits, dim=1)

    return probs.detach().cpu().numpy()

In [None]:
# Initialize the LIME text explainer
explainer = LimeTextExplainer(class_names=['Democrat', 'Republican'])


In [None]:
# Function to explain predictions
def explain_prediction(tweet):
    explanation = explainer.explain_instance(tweet, predict_proba, num_features=10)
    return explanation


In [None]:
# # Example tweet to explain
# sample_tweet = tweets[0]

# # Explain the prediction
# explanation = explain_prediction(sample_tweet)

# # Show the explanation
# explanation.show_in_notebook(text=True)


In [None]:
def predict_tweet(model, tweet, tokenizer, max_length, device):
    # Tokenize and encode the tweet
    encoding = tokenizer.encode_plus(
        tweet,
        add_special_tokens=True,
        max_length=max_length,
        return_token_type_ids=False,
        padding='max_length',
        return_attention_mask=True,
        return_tensors='pt',
    )

    # Move inputs to the appropriate device
    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)

    # Set model to evaluation mode and make prediction
    model.eval()
    with torch.no_grad():
        output = model(input_ids, attention_mask)
        _, prediction = torch.max(output.logits, dim=1)

    # Map the prediction to the class label
    class_label = 'Democrat' if prediction.item() == 0 else 'Republican'
    return class_label


In [None]:
def sort_tuples_array_by_second_item(tuples):
    # Sort the tuples by the second item using the itemgetter function
    return sorted(tuples, key=itemgetter(1))

In [None]:
def get_max_explained_words(txt):

  prediction = predict_tweet(model, txt, tokenizer, MAX_LENGTH, device)
  # print(" ")
  # print("prediction")
  # print(prediction)

  exp = explain_prediction(txt)

  exp_list = []
  for x in zip(exp.local_exp[1], exp.as_list()):
    exp_list.append((x[1][0], x[1][1], x[0][0]))

  # print("exp_list")
  # print(exp_list)

  # features with negative score are for Male class
  democrat_list = list(filter(lambda x: x[1] < 0, exp_list))
  democrat_list = sort_tuples_array_by_second_item(democrat_list)

  # print("democrat_list")
  # print(democrat_list)
  # print(len(democrat_list))

  # features with positive score are for female class
  republican_list = list(filter(lambda x: x[1] > 0, exp_list))
  republican_list = sort_tuples_array_by_second_item(republican_list)

  # print("republican_list")
  # print(republican_list)
  # print(len(republican_list))

  # min is used while the democrat score is negative
  democrat_mc = min(democrat_list, key=itemgetter(1)) if len(democrat_list) else None

  # print("democrat_mc")
  # print(democrat_mc)

  # max is used while the republican score is negative
  republican_mc = max(republican_list, key=itemgetter(1)) if len(republican_list) else None

  # print("republican_mc")
  # print(republican_mc)

  # if comment predicted Male
  if prediction == "Democrat":
    if len(democrat_list) > 1:
      democrat_mc = democrat_list[0]
      if (democrat_mc, 0) in words:
        words[(democrat_mc[0], 0)]['lime_score'].extend(democrat_mc[1])
        words[(democrat_mc[0], 0)]['position'] = democrat_mc[2]
      else:
        words[(democrat_mc[0], 0)] = {}
        words[(democrat_mc[0], 0)]['lime_score'] = [democrat_mc[1]]
        words[(democrat_mc[0], 0)]['position'] = democrat_mc[2]
        wordsForCSV.append([democrat_mc[0], 0, democrat_mc[1]])

      democrat_mc = democrat_list[1]
      if (democrat_mc, 0) in words:
        words[(democrat_mc[0], 0)]['lime_score'].extend(democrat_mc[1])
        words[(democrat_mc[0], 0)]['position'] = democrat_mc[2]
      else:
        words[(democrat_mc[0], 0)] = {}
        words[(democrat_mc[0], 0)]['lime_score'] = [democrat_mc[1]]
        words[(democrat_mc[0], 0)]['position'] = democrat_mc[2]
        wordsForCSV.append([democrat_mc[0], 0, democrat_mc[1]])
    elif len(democrat_list) == 1:
      democrat_mc = democrat_list[0]
      if (democrat_mc, 0) in words:
        words[(democrat_mc[0], 0)]['lime_score'].extend(democrat_mc[1])
        words[(democrat_mc[0], 0)]['position'] = democrat_mc[2]
      else:
        words[(democrat_mc[0], 0)] = {}
        words[(democrat_mc[0], 0)]['lime_score'] = [democrat_mc[1]]
        words[(democrat_mc[0], 0)]['position'] = democrat_mc[2]
        wordsForCSV.append([democrat_mc[0], 0, democrat_mc[1]])

  else:
    if len(republican_list) > 1:
      republican_mc = republican_list[(len(republican_list)-1)]
      if (republican_mc, 1) in words:
        words[(republican_mc[0], 1)]['lime_score'].extend(republican_mc[1])
        words[(republican_mc[0], 1)]['position'] = republican_mc[2]
      else:
        words[(republican_mc[0], 1)] = {}
        words[(republican_mc[0], 1)]['lime_score'] = [republican_mc[1]]
        words[(republican_mc[0], 1)]['position'] = republican_mc[2]
        wordsForCSV.append([republican_mc[0], 1, republican_mc[1]])

      republican_mc = republican_list[(len(republican_list)-2)]
      if (republican_mc, 1) in words:
        words[(republican_mc[0], 1)]['lime_score'].extend(republican_mc[1])
        words[(republican_mc[0], 1)]['position'] = republican_mc[2]
      else:
        words[(republican_mc[0], 1)] = {}
        words[(republican_mc[0], 1)]['lime_score'] = [republican_mc[1]]
        words[(republican_mc[0], 1)]['position'] = republican_mc[2]
        wordsForCSV.append([republican_mc[0], 1, republican_mc[1]])

    elif len(republican_list) == 1:
      republican_mc = republican_list[0]
      if (republican_mc, 1) in words:
        words[(republican_mc[0], 1)]['lime_score'].extend(republican_mc[1])
        words[(republican_mc[0], 1)]['position'] = republican_mc[2]
      else:
        words[(republican_mc[0], 1)] = {}
        words[(republican_mc[0], 1)]['lime_score'] = [republican_mc[1]]
        words[(republican_mc[0], 1)]['position'] = republican_mc[2]
        wordsForCSV.append([republican_mc[0], 1, republican_mc[1]])


  return words, wordsForCSV

In [None]:
words = {}
wordsForCSV = []

In [None]:
for tweet in tqdm(first_half_tweets, total = len(first_half_tweets)):
  words, wordsForCSV = get_max_explained_words(tweet)

In [None]:
import csv

header=["word", "label", "limescore"]

with open('/content/drive/MyDrive/Datasets/Tweets/24_extracted_strong_words_by_bert_base_uncased.csv', 'w', encoding='UTF8', newline='') as f:
    writer = csv.writer(f)

    # write the header
    writer.writerow(header)

    # write multiple rows
    writer.writerows(wordsForCSV)


In [None]:
# print(words)
# print(wordsForCSV)