In [None]:
!pip install transformers
!pip install datasets

In [None]:
from datasets import load_dataset,DatasetDict
from transformers import AutoTokenizer,TFAutoModelForSequenceClassification
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
DATA_PATH = "/content/LTI_Dataset.csv" 

In [None]:
pandas_df = pd.read_csv(DATA_PATH)
pandas_df.head()

In [None]:

pandas_df = pandas_df.iloc[2058:]
pandas_df = pandas_df.reset_index()

In [None]:
for i in range(pandas_df.shape[0]):
  if(pandas_df['intent'][i]==2):
    pandas_df['intent'][i]=0

In [None]:
pandas_df

In [None]:
pandas_df = pandas_df[pandas_df['intent'].notna()]

In [None]:
pandas_df['intent'] = pd.to_numeric(pandas_df['intent'],downcast='integer')

In [None]:
# for index in pandas_df.index:
#   if pandas_df['target'][index]==4:
#     pandas_df.drop(index)
# pandas_df = pandas_df.reset_index()

# pandas_df = pandas_df[pandas_df.target != 4]

In [None]:
pandas_df['intent'].value_counts()

In [None]:
import re
Tweet = []
for tweettext in pandas_df["text"]:
  text = re.sub(r"http\S+", "", tweettext)
  text = re.sub(r"@\S+","",text)
  text = re.sub(r"@\S+","",text)
  text = re.sub(r".com$","",text)
  text = re.sub(r"@","",text)

  emoji_pattern = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                           "]+", flags=re.UNICODE)
  text = emoji_pattern.sub(r'',text)
  text = re.sub('[^A-Za-z0-9]+', ' ', text)
  #print(text)
  Tweet.append(text)

In [None]:
import nltk
nltk.download('words')
words = set(nltk.corpus.words.words())
nltk.download('omw-1.4')
nltk.download('wordnet')
wn=nltk.corpus.wordnet
wn_lemmas = set(wn.all_lemma_names())
len(wn_lemmas)
words.update(['vladimir', 'putin', 'zelenskyy', 'zelensky', 'russia', 'ukraine', 'trump', 'biden', 'joe', 'US', 'usa', 'nukes', 'kyiv', 'kiev'])
words.update(wn_lemmas)

In [None]:
pandas_df["tweet_cleaned"] = pd.Series(Tweet)

text_cleaned = pandas_df["tweet_cleaned"] 
texts_new = []

for sentence in text_cleaned:
    text_new = " ".join(w.lower() for w in nltk.wordpunct_tokenize(sentence) if w.lower() in words or not w.isalpha()) 
    text_new = text_new.encode('ascii',errors='ignore').decode('ascii')
    texts_new.append(text_new)

pandas_df["tweet_cleaned"] = pd.Series(texts_new)
pandas_df

In [None]:
import gensim
from gensim.summarization import summarize
from nltk.tokenize import sent_tokenize

for index in pandas_df.index:
  if len(pandas_df['tweet_cleaned'][index]) > 512:
    text = pandas_df['tweet_cleaned'][index]
    
    try:
      text = summarize(text, word_count = 50)
    except:
      list1 = list(text)
      list1 = list1[:512]
      text = ''.join(list1)

    pandas_df['tweet_cleaned'][index] = text

pandas_df = pandas_df.reset_index()
pandas_df

In [None]:
words_len = []
for i in range(len(pandas_df['tweet_cleaned'])):
  s = pandas_df['tweet_cleaned'][i].split() 
  words_len.append(len(s))

In [None]:
from datasets import Dataset

ds = Dataset.from_pandas(pandas_df)
ds

In [None]:
dataset = load_dataset('csv', data_files=DATA_PATH, split='train')

dataset

In [None]:
train_test_valid = ds.train_test_split(test_size =0.20)

test_valid = train_test_valid['test'].train_test_split(test_size=0.50)

train_test_valid_dataset = DatasetDict({
    'train': train_test_valid['train'],
    'test': test_valid['test'],
    'valid': test_valid['train']
    })


dataset = train_test_valid_dataset.remove_columns(['level_0','text', 'label', 'target','filename','index'])
dataset

In [None]:
model_type =  "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_type, use_fast=False)

In [None]:
print(f"Vocab size is : {tokenizer.vocab_size}")

print(f"Model max length is : {tokenizer.model_max_length}")

print(f"Model input names are: {tokenizer.model_input_names}")

In [None]:
def tokenize_function(train_dataset):
    return tokenizer(train_dataset['tweet_cleaned'], padding='max_length', truncation=True) 


tokenized_dataset = dataset.map(tokenize_function, batched=True)

tokenized_dataset

train_dataset = tokenized_dataset['train']
eval_dataset = tokenized_dataset['valid']
test_dataset = tokenized_dataset['test']

In [None]:
train_dataset

In [None]:
# train_set = train_dataset.remove_columns(["tweet_cleaned",'level_0']).with_format('tensorflow')

# tf_eval_dataset = eval_dataset.remove_columns(["tweet_cleaned",'level_0']).with_format('tensorflow')

# tf_test_dataset = test_dataset.remove_columns(["tweet_cleaned",'level_0']).with_format('tensorflow')

train_set = train_dataset.remove_columns(["tweet_cleaned"]).with_format('tensorflow')

tf_eval_dataset = eval_dataset.remove_columns(["tweet_cleaned"]).with_format('tensorflow')

tf_test_dataset = test_dataset.remove_columns(["tweet_cleaned"]).with_format('tensorflow')

In [None]:
train_set

In [None]:
train_features = { x: train_set[x] for x in tokenizer.model_input_names  }

train_set_for_final_model = tf.data.Dataset.from_tensor_slices((train_features, train_set['intent'] ))

train_set_for_final_model = train_set_for_final_model.shuffle(len(train_set)).batch(8)


eval_features = {x: tf_eval_dataset[x] for x in tokenizer.model_input_names}
val_set_for_final_model = tf.data.Dataset.from_tensor_slices((eval_features, tf_eval_dataset["intent"]))
val_set_for_final_model = val_set_for_final_model.batch(8)

test_features = {x: tf_test_dataset[x] for x in tokenizer.model_input_names}
test_set_for_final_model = tf.data.Dataset.from_tensor_slices((test_features, tf_test_dataset["intent"]))
test_set_for_final_model =test_set_for_final_model.batch(8)

In [None]:
train_set_for_final_model

In [None]:
pandas_df["tweet_cleaned"]

In [None]:
pip install livelossplot

In [None]:
from livelossplot import PlotLossesKeras
from tensorflow.keras.layers import add, LSTM, Embedding, Dense
callbacks = [PlotLossesKeras()]

In [None]:
model = TFAutoModelForSequenceClassification.from_pretrained(model_type,num_labels=2)
# model = TFAutoModelForSequenceClassification.from_pretrained("/mnt/e0ccdbdb-22c3-4d9b-9413-fd976a2e99ae/M1/Code_Org/HF_Models/bert-base-uncased", num_labels=3)


model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=tf.metrics.SparseCategoricalAccuracy(),
)
model.summary()

In [None]:
history = model.fit(train_set_for_final_model, validation_data=val_set_for_final_model, epochs=3,callbacks=[callbacks],verbose=1 )

In [None]:
preds = model.predict(test_set_for_final_model,verbose=1)

In [None]:
# model.save('/content/gdrive/MyDrive/WAR misinformation/bert_targets')

In [None]:
!pip install seqeval

In [None]:
import seqeval
from seqeval.metrics import precision_score, recall_score, f1_score, classification_report

In [None]:
test_labels = []
for i in range(len(test_valid['test']['intent'])):
  l = test_valid['test']['intent'][i]
  if(l==1):
    test_labels.append("DIRECTED")
  elif(l==0):
    test_labels.append("UNDIRECTED")
  

In [None]:
preds_labels = []
for i in range(len(preds['logits'])):
  p = np.argmax(preds['logits'][i])
  if(p==1):
    preds_labels.append("DIRECTED")
  elif(p==0):
    preds_labels.append("UNDIRECTED")


In [None]:
def extractDigits(lst):
    return [[el] for el in lst]

In [None]:
preds_labels=extractDigits(preds_labels)
test_labels = extractDigits(test_labels)

In [None]:
np.array(preds_labels).shape

In [None]:
np.array(test_labels).shape

In [None]:
print(classification_report(preds_labels,test_labels))

In [None]:
print(f1_score(test_labels, preds_labels,average='macro'))

In [None]:
print(precision_score(test_labels, preds_labels,average='macro'))

In [None]:
print(recall_score(test_labels, preds_labels,average='macro'))

In [None]:
new_test = []

for i in test_labels:

  if i == ['DIRECTED']:
    pred=1
  else:
    pred=0
  new_test.append(pred)

new_pred = []
for i in preds_labels:
  if i == ['DIRECTED']:
    pred=1
  else:
    pred=0
  new_pred.append(pred)

In [None]:
from imblearn.metrics import macro_averaged_mean_absolute_error 
macro_averaged_mean_absolute_error(new_test, new_pred)

In [None]:
from sklearn.metrics import accuracy_score
accuracy_score(new_test,new_pred)