In [2]:
!pip install emoji
!pip install transformers
!pip install nltk emoji==0.6.0
!git clone https://github.com/HarindraMavikumbure/Twitter_sentiment




fatal: destination path 'Twitter_sentiment' already exists and is not an empty directory.


In [4]:
import numpy as np
import pandas as pd
import configparser

#data processing
import re, string

from sklearn import preprocessing
from imblearn.over_sampling import RandomOverSampler
from sklearn.model_selection import train_test_split

#Naive Bayes
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.naive_bayes import MultinomialNB

#transformers
from transformers import BertTokenizerFast
from transformers import TFBertModel
from transformers import RobertaTokenizerFast
from transformers import TFRobertaModel

#keras
import tensorflow as tf


#metrics
from sklearn.metrics import classification_report, confusion_matrix

from emoji import demojize

#set seed for reproducibility
seed=42
MAX_LEN=128


def create_model(bert_model, max_len=MAX_LEN):

    ##params###
    opt = tf.keras.optimizers.Adam(learning_rate=1e-5, decay=1e-7)
    loss = tf.keras.losses.CategoricalCrossentropy()
    accuracy = tf.keras.metrics.CategoricalAccuracy()


    input_ids = tf.keras.Input(shape=(max_len,),dtype='int32')

    attention_masks = tf.keras.Input(shape=(max_len,),dtype='int32')

    embeddings = bert_model([input_ids,attention_masks])[1]

    output = tf.keras.layers.Dense(5, activation="softmax")(embeddings)

    model = tf.keras.models.Model(inputs = [input_ids,attention_masks], outputs = output)

    model.compile(opt, loss=loss, metrics=accuracy)


    return model

def tokenize(data,max_len=MAX_LEN) :
    input_ids = []
    attention_masks = []
    for i in range(len(data)):
        encoded = tokenizer.encode_plus(
            data[i],
            add_special_tokens=True,
            max_length=MAX_LEN,
            padding='max_length',
            return_attention_mask=True
        )
        input_ids.append(encoded['input_ids'])
        attention_masks.append(encoded['attention_mask'])
    return np.array(input_ids),np.array(attention_masks)

def tokenize_roberta(data,max_len=MAX_LEN) :
    input_ids = []
    attention_masks = []
    for i in range(len(data)):
        encoded = tokenizer_roberta.encode_plus(
            data[i],
            add_special_tokens=True,
            max_length=max_len,
            padding='max_length',
            return_attention_mask=True
        )
        input_ids.append(encoded['input_ids'])
        attention_masks.append(encoded['attention_mask'])
    return np.array(input_ids),np.array(attention_masks)

##CUSTOM DEFINED FUNCTIONS TO CLEAN THE TWEETS

#Clean emojis from text
def strip_emoji(text):
    return demojize(text) #remove emoji

#Remove punctuations, links, mentions and \r\n new line characters
def strip_all_entities(text):
    text = text.replace('\r', '').replace('\n', ' ').replace('\n', ' ').lower() #remove \n and \r and lowercase
    text = re.sub(r"(?:\@|https?\://)\S+", "", text) #remove links and mentions
    text = re.sub(r'[^\x00-\x7f]',r'', text) #remove non utf8/ascii characters such as '\x9a\x91\x97\x9a\x97'
    banned_list= string.punctuation + 'Ã'+'±'+'ã'+'¼'+'â'+'»'+'§'
    table = str.maketrans('', '', banned_list)
    text = text.translate(table)
    return text

#clean hashtags at the end of the sentence, and keep those in the middle of the sentence by removing just the # symbol
def clean_hashtags(tweet):
    new_tweet = " ".join(word.strip() for word in re.split('#(?!(?:hashtag)\b)[\w-]+(?=(?:\s+#[\w-]+)*\s*$)', tweet)) #remove last hashtags
    new_tweet2 = " ".join(word.strip() for word in re.split('#|_', new_tweet)) #remove hashtags symbol from words in the middle of the sentence
    return new_tweet2

#Filter special characters such as & and $ present in some words
def filter_chars(a):
    sent = []
    for word in a.split(' '):
        if ('$' in word) | ('&' in word):
            sent.append('')
        else:
            sent.append(word)
    return ' '.join(sent)

def remove_mult_spaces(text): # remove multiple spaces
    return re.sub("\s\s+" , " ", text)

#INI Stuff

# config section, variables from config.ini
config = configparser.ConfigParser()
config.read('Twitter_sentiment/src/config.ini')

# INI file variables, descriptions in INI file
model_type = int(config["MODEL"]['MODEL_TYPE'])
batch_size = int(config["MODEL"]['BATCH_SIZE'])
epochs = int(config["MODEL"]['EPOCHS'])
category = int(config["CATEGORY"]['TARGET'])
test_type = int(config["CATEGORY"]['TEST'])


csv_path = config["CSV"]['PATH']
train_list = ["individuals_train_dev.csv", "groups_train_dev.csv", "events_train_dev.csv"]
test_list = ["individuals_test.csv", "groups_test.csv", "events_test.csv", "all_test.csv"]

train_string = csv_path + train_list[category]
test_string = csv_path + test_list[test_type]

# Prepare Dataframe

df_train = pd.read_csv(train_string,encoding='utf-8')
df_test = pd.read_csv(test_string,encoding='utf-8')

df_train.head()
df_test.head()
df = df_train[['clean_text','category']]
df_test = df_test[['clean_text','category']]

texts_new = []
for t in df.clean_text:
    texts_new.append(remove_mult_spaces(filter_chars(clean_hashtags(strip_all_entities(strip_emoji(t))))))

texts_new_test = []
for t in df_test.clean_text:
    texts_new_test.append(remove_mult_spaces(filter_chars(clean_hashtags(strip_all_entities(strip_emoji(t))))))

df['text_clean'] = texts_new
df_test['text_clean'] = texts_new_test

text_len = []
for text in df.text_clean:
    tweet_len = len(text.split())
    text_len.append(tweet_len)

df['text_len'] = text_len

text_len_test = []
for text in df_test.text_clean:
    tweet_len = len(text.split())
    text_len_test.append(tweet_len)

df_test['text_len'] = text_len_test

df['text_clean'].head()
df_test['text_clean'].head()

#Tokenize

tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')

token_lens_test = []

for i,txt in enumerate(df_test['text_clean'].values):
    tokens = tokenizer.encode(txt, max_length=512, truncation=True)
    token_lens_test.append(len(tokens))
    if len(tokens)>80:
        print(f"INDEX: {i}, TEXT: {txt}")

# Ready Dataframes for every type of model

df_test['token_lens'] = token_lens_test
df_test = df_test.sort_values(by='token_lens', ascending=False)
df_test.head(10)
df_test = df_test.iloc[5:]
df_test.head(3)
df_test = df_test.sample(frac=1).reset_index(drop=True)


df['category'].value_counts()

ros = RandomOverSampler()
train_x, train_y = ros.fit_resample(np.array(df['text_clean']).reshape(-1, 1), np.array(df['category']).reshape(-1, 1));
train_os = pd.DataFrame(list(zip([x[0] for x in train_x], train_y)), columns = ['text_clean', 'category']);


X = train_os['text_clean'].values
y = train_os['category'].values

# Use seed to ensure the train/dev split is the same each run
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.1, stratify=y, random_state=seed)

X_test = df_test['text_clean'].values
y_test = df_test['category'].values

# Used for Naive Bayes
y_train_le = y_train.copy()
y_valid_le = y_valid.copy()
y_test_le = y_test.copy()

X_test = df_test['text_clean'].values
y_test = df_test['category'].values

#Avoid fit transform for smaller sets, as they may be reshaped if extreme values are missing
ohe = preprocessing.OneHotEncoder()
y_train = ohe.fit_transform(np.array(y_train).reshape(-1, 1)).toarray()
y_valid = ohe.transform(np.array(y_valid).reshape(-1, 1)).toarray()
y_test = ohe.transform(np.array(y_test).reshape(-1, 1)).toarray()

#Naive Bayes
if model_type == 0:

    clf = CountVectorizer()
    X_train_cv =  clf.fit_transform(X_train)
    X_test_cv = clf.transform(X_test)


    tf_transformer = TfidfTransformer(use_idf=True).fit(X_train_cv)
    X_train_tf = tf_transformer.transform(X_train_cv)
    X_test_tf = tf_transformer.transform(X_test_cv)


    nb_clf = MultinomialNB()

    nb_clf.fit(X_train_tf, y_train_le)


    nb_pred = nb_clf.predict(X_test_tf)


    #this is a hack to get the classification report to display al classes and not just predicted classes
    nb_pred_all_classes = ohe.transform(np.array(nb_pred).reshape(-1, 1)).toarray()

    print('\tClassification Report for Naive Bayes:\n\n',classification_report(y_test,nb_pred_all_classes, target_names=['Strong Negative', 'Negative', 'Neutral', 'Positive', 'Strong Positive']))

# BERT MODEL

if model_type == 1:

    train_input_ids, train_attention_masks = tokenize(X_train, MAX_LEN)
    val_input_ids, val_attention_masks = tokenize(X_valid, MAX_LEN)
    test_input_ids, test_attention_masks = tokenize(X_test, MAX_LEN)

    bert_model = TFBertModel.from_pretrained('bert-base-uncased')


    model = create_model(bert_model, MAX_LEN)
    model.summary()


    history_bert = model.fit([train_input_ids,train_attention_masks], y_train, validation_data=([val_input_ids,val_attention_masks], y_valid), epochs=epochs, batch_size=batch_size)


    result_bert = model.predict([test_input_ids,test_attention_masks])


    y_pred_bert =  np.zeros_like(result_bert)
    y_pred_bert[np.arange(len(y_pred_bert)), result_bert.argmax(1)] = 1

    print('\tClassification Report for BERT:\n\n',classification_report(y_test,y_pred_bert, target_names=['Strong Negative', 'Negative', 'Neutral', 'Positive', 'Strong Positive']))


# ROBERTA MODEL

if model_type == 2:
    tokenizer_roberta = RobertaTokenizerFast.from_pretrained("roberta-base")

    token_lens = []

    for txt in X_train:
        tokens = tokenizer_roberta.encode(txt, max_length=512, truncation=True)
        token_lens.append(len(tokens))
    max_length=np.max(token_lens)
    max_length


    train_input_ids, train_attention_masks = tokenize_roberta(X_train, MAX_LEN)
    val_input_ids, val_attention_masks = tokenize_roberta(X_valid, MAX_LEN)
    test_input_ids, test_attention_masks = tokenize_roberta(X_test, MAX_LEN)


    roberta_model = TFRobertaModel.from_pretrained('roberta-base')


    model = create_model(roberta_model, MAX_LEN)
    model.summary()

    history_2 = model.fit([train_input_ids,train_attention_masks], y_train, validation_data=([val_input_ids,val_attention_masks], y_valid), epochs=epochs, batch_size=batch_size)


    result_roberta = model.predict([test_input_ids,test_attention_masks])

    y_pred_roberta =  np.zeros_like(result_roberta)
    y_pred_roberta[np.arange(len(y_pred_roberta)), result_roberta.argmax(1)] = 1


    print('\tClassification Report for RoBERTa:\n\n',classification_report(y_test,y_pred_roberta, target_names=['Strong Negative', 'Negative', 'Neutral', 'Positive', "Strong Positive"]))

	Classification Report for Naive Bayes:

                  precision    recall  f1-score   support

Strong Negative       0.00      0.00      0.00         6
       Negative       0.32      0.56      0.41       112
        Neutral       0.62      0.26      0.36       256
       Positive       0.36      0.48      0.41       101
Strong Positive       0.04      0.33      0.07         3

      micro avg       0.37      0.37      0.37       478
      macro avg       0.27      0.33      0.25       478
   weighted avg       0.49      0.37      0.38       478
    samples avg       0.37      0.37      0.37       478

