<a href="https://colab.research.google.com/github/arzoozehra/CIND820/blob/main/models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Import libraries**

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import re
import time
#!pip install contractions
import contractions
import nltk
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')
#!pip install pyspellchecker
#from spellchecker import SpellChecker
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer, WordNetLemmatizer
from collections import Counter
import plotly.express as px
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.svm import SVC, LinearSVC
from sklearn.model_selection import cross_validate
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay
from xgboost import XGBClassifier
from tensorflow.python.keras import models
from tensorflow.python.keras.layers import Dense, Dropout
from tensorflow.python.keras.callbacks import EarlyStopping
from tensorflow.python.keras.optimizer_v2.adam import Adam

**Load data**

In [None]:
url = 'https://raw.githubusercontent.com/arzoozehra/CIND820/main/data/train.csv'
train = pd.read_csv(url)
test = pd.read_csv('https://raw.githubusercontent.com/arzoozehra/CIND820/main/data/test.csv')

# Remove row with missing values
train.dropna(inplace=True)

In [None]:
print(train["text"].head(10))
print(train["text"].tail(10))

**Clean training data**

In [None]:
# Convert text to lowercase
train['text'] = train['text'].str.lower()

# Expand contractions e.g "gonna" to "going to" and "i've" to "i have"
train['text'].replace( {r"`": "'"}, inplace= True, regex = True)
train['text'] = train['text'].apply(contractions.fix)

# Remove @, Unicode characters, punctuation, emojis, URLs, retweets, words with digits, and 1 or 2 letter words
train['text'].replace( {r"(@\[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|^rt|http.+?|\w*\d\w*|\b\w{1,2}\b": " "}, inplace= True, regex = True)

# Remove extra whitespaces
train['text'].replace( {r" +": " "}, inplace= True, regex = True)
train['text'] = train['text'].str.strip()

# Correct spellings
#spell = SpellChecker()

#def correct_spellings(text):
#    corrected_text = []
#    misspelled_words = {}
#    words = text.split()
#    for w in spell.unknown(words):
#        corr = spell.correction(w)
#        if corr:
#            misspelled_words[w] = spell.correction(w) or w
#    corrected_text = [misspelled_words.get(w, w) for w in words]
#    return " ".join(corrected_text)

#train['text'] = train['text'].apply(lambda x : correct_spellings(x))

# Remove stopwords
stop = stopwords.words('english')
train['text'] = train['text'].apply(lambda text: " ".join([word for word in text.split() if word not in (stop)]))

# Stemming
stemmer = PorterStemmer()
train['text'] = train['text'].apply(lambda text: " ".join([stemmer.stem(word) for word in text.split()]))

# Lemmatizing
lemmatizer = WordNetLemmatizer()
train['text'] = train['text'].apply(lambda text: " ".join([lemmatizer.lemmatize(word) for word in text.split()]))


**Clean testing data**

In [None]:
# Convert text to lowercase
test['text'] = test['text'].str.lower()

# Expand contractions e.g "gonna" to "going to" and "i've" to "i have"
test['text'].replace( {r"`": "'"}, inplace= True, regex = True)
test['text'] = test['text'].apply(contractions.fix)

# Remove @, Unicode characters, punctuation, emojis, URLs, retweets, words with digits, and 1 or 2 letter words
test['text'].replace( {r"(@\[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|^rt|http.+?|\w*\d\w*|\b\w{1,2}\b": " "}, inplace= True, regex = True)

# Remove extra whitespaces
test['text'].replace( {r" +": " "}, inplace= True, regex = True)
test['text'] = test['text'].str.strip()

# Remove stopwords
stop = stopwords.words('english')
test['text'] = test['text'].apply(lambda text: " ".join([word for word in text.split() if word not in (stop)]))

# Stemming
stemmer = PorterStemmer()
test['text'] = test['text'].apply(lambda text: " ".join([stemmer.stem(word) for word in text.split()]))

# Lemmatizing
lemmatizer = WordNetLemmatizer()
test['text'] = test['text'].apply(lambda text: " ".join([lemmatizer.lemmatize(word) for word in text.split()]))


In [None]:
print(train["text"].head(10))
print(train["text"].tail(10))

In [None]:
print(test['text'].head(20))
print(test['text'].tail(20))


**Feature Selection**

In [None]:
# Vectorization parameters
# Range (inclusive) of n-gram sizes for tokenizing text.
NGRAM_RANGE = (1, 2)  # Use 1-grams + 2-grams.

# Limit on the number of features. We use the top 20K features.
TOP_K = 20000

# Whether text should be split into word or character n-grams.
TOKEN_MODE = 'word' # Split text into word tokens.

# Minimum document frequency below which a token will be discarded.
MIN_DOCUMENT_FREQUENCY = 2

def ngram_vectorize(train_texts, train_labels, test_texts):
    """Vectorizes texts as n-gram vectors.

    1 text = 1 tf-idf vector the length of vocabulary of unigrams + bigrams.

    # Arguments
        train_texts: list, training text strings.
        train_labels: np.ndarray, training labels.
        test_texts: list, test text strings.

    # Returns
        train_vectors, test_vectors: vectorized training and test texts
    """
    # Create keyword arguments to pass to the 'tf-idf' vectorizer.
    kwargs = {
            'ngram_range': NGRAM_RANGE,
            'analyzer': TOKEN_MODE,  
            'min_df': MIN_DOCUMENT_FREQUENCY,
            'sublinear_tf': 'True'
    }
    vectorizer = TfidfVectorizer(**kwargs)

    # Learn vocabulary from training texts and vectorize training texts.
    train_vectors = vectorizer.fit_transform(train_texts)

    # Vectorize validation texts.
    test_vectors = vectorizer.transform(test_texts)

    # Select top 'k' of the vectorized features.
    selector = SelectKBest(f_classif, k=min(TOP_K, train_vectors.shape[1]))
    selector.fit(train_vectors, train_labels)
    train_vectors = selector.transform(train_vectors).astype('float32').toarray()
    test_vectors = selector.transform(test_vectors).astype('float32').toarray()
    return train_vectors, test_vectors

train_vectors, test_vectors = ngram_vectorize(train['text'], train['sentiment'], test['text'])


# # Create feature vectors
# vectorizer = TfidfVectorizer(min_df = 5,
#                              max_df = 0.8,
#                              sublinear_tf = True,
#                              use_idf = True)
# train_vectors = vectorizer.fit_transform(train['text'])
# test_vectors = vectorizer.transform(test['text'])

**Supervised modelling**

In [None]:
supervised_models = [
    LinearSVC(),
    SVC(kernel='linear'),
    XGBClassifier(objective='multi:softmax'),
]

# 5-fold Cross-validation
k = 5
cv_df = pd.DataFrame(index=range(k * len(supervised_models)))

entries = []
for model in supervised_models:
  model_name = model.__class__.__name__
  accuracies = cross_validate(model, train_vectors, train['sentiment'], scoring='accuracy', cv=k)
  for fold_id, accuracy in enumerate(accuracies):
    entries.append((model_name, fold_id, accuracy))
cv_df = pd.DataFrame(entries, columns=['model_name', 'fold_id', 'accuracy'])

In [None]:
mean_accuracy = cv_df.groupby('model_name').accuracy.mean()
std_accuracy = cv_df.groupby('model_name').accuracy.std()

acc = pd.concat([mean_accuracy, std_accuracy, mean_f1, std_f1], axis= 1, ignore_index=True)
acc.columns = ['Accuracy', ' Std dev']
acc

In [None]:
model = LinearSVC(loss='hinge', max_iter=1000)
model.fit(train_vectors, train['sentiment'])
prediction = model.predict(test_vectors)
print(f"Test set accuracy: {accuracy_score(test['sentiment'], prediction) * 100} %\n")

In [None]:
# Classification report
print('\tClassification Metrics - LinearSVC\n')
print(classification_report(test['sentiment'], prediction, target_names= ['negative', 'neutral', 'positive']))

In [None]:
data = confusion_matrix(test['sentiment'], prediction)
disp = ConfusionMatrixDisplay(confusion_matrix=data, display_labels=model.classes_)
disp.plot(cmap="Blues")
plt.ylabel('ACTUAL')
plt.xlabel('\nPREDICTED')
plt.title("\nCONFUSION MATRIX - LinearSVC\n");
plt.show()

In [None]:
model = XGBClassifier(objective='multi:softmax')
model.fit(train_vectors, train['sentiment'])
prediction = model.predict(test_vectors)
print(f"Test set accuracy: {accuracy_score(test['sentiment'], prediction) * 100} %\n")

In [None]:
# Classification report
print('\tCLASSIFICATIION METRICS - XGBClassifier\n')
print(classification_report(test['sentiment'], prediction, target_names= ['negative', 'neutral', 'positive']))

In [None]:
data = confusion_matrix(test['sentiment'], prediction)
disp = ConfusionMatrixDisplay(confusion_matrix=data, display_labels=model.classes_)
disp.plot(cmap="Blues")
plt.ylabel('ACTUAL')
plt.xlabel('\nPREDICTED')
plt.title("\nCONFUSION MATRIX - XGBClassifier\n");
plt.show()

**Unsupervised modelling**

In [None]:
def mlp_model(layers, units, op_units, op_activation, dropout_rate, input_shape, num_classes):
    """Creates an instance of a multi-layer perceptron model.

    # Arguments
        layers: int, number of `Dense` layers in the model.
        units: int, output dimension of the layers.
        dropout_rate: float, percentage of input to drop at Dropout layers.
        input_shape: tuple, shape of input to the model.
        num_classes: int, number of output classes.

    # Returns
        An MLP model instance.
    """

    model = models.Sequential()
    model.add(Dropout(rate=dropout_rate, input_shape=input_shape))

    for _ in range(layers-1):
        model.add(Dense(units=units, activation='relu'))
        model.add(Dropout(rate=dropout_rate))

    model.add(Dense(units=op_units, activation=op_activation))
    return model

**Train model**

In [None]:
def train_ngram_model(train_vectors, train_labels, test_vectors, test_labels,
                      num_classes,
                      learning_rate=1e-3,
                      epochs=1000,
                      batch_size=128,
                      layers=2,
                      units=64,
                      dropout_rate=0.2):
    """Trains n-gram model on the given dataset.

    # Arguments
        data: tuples of training and test texts and labels.
        learning_rate: float, learning rate for training model.
        epochs: int, number of epochs.
        batch_size: int, number of samples per batch.
        layers: int, number of `Dense` layers in the model.
        units: int, output dimension of Dense layers in the model.
        dropout_rate: float: percentage of input to drop at Dropout layers.

    # Raises
        ValueError: If validation data has label values which were not seen
            in the training data.
    """

    # Create model instance.
    model = mlp_model(layers=layers, units=units, 
                      op_units=num_classes, op_activation = 'softmax',
                      dropout_rate=dropout_rate,
                      input_shape=train_vectors.shape[1:],
                      num_classes=num_classes)

    # Compile model with learning parameters.
    loss = 'sparse_categorical_crossentropy' # for multiclass
    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss=loss, metrics=['acc'])

    # Create callback for early stopping on validation loss. If the loss does
    # not decrease in two consecutive tries, stop training.
    callbacks = [EarlyStopping(monitor='val_loss', patience=2)]

    # Train and test model.
    history = model.fit(
            train_vectors,
            train_labels,
            epochs=epochs,
            callbacks=callbacks,
            validation_data=(test_vectors, test_labels),
            verbose=2,  # Logs once per epoch.
            batch_size=batch_size)

    # Print results.
    history = history.history
    print('Test accuracy: {acc}, loss: {loss}'.format(
            acc=history['val_acc'][-1], loss=history['val_loss'][-1]))

    # Save model.
    model.save('Twitter_mlp_model.h5')
    return history['val_acc'][-1], history['val_loss'][-1]

In [None]:
labels = np.array(train['sentiment'])
train_labels = []
for i in range(len(labels)):
    if labels[i] == 'neutral':
        train_labels.append(0)
    if labels[i] == 'negative':
        train_labels.append(1)
    if labels[i] == 'positive':
        train_labels.append(2)
train_labels = np.array(train_labels)

labels = np.array(test['sentiment'])
test_labels = []
for i in range(len(labels)):
    if labels[i] == 'neutral':
        test_labels.append(0)
    if labels[i] == 'negative':
        test_labels.append(1)
    if labels[i] == 'positive':
        test_labels.append(2)
test_labels = np.array(test_labels)

In [None]:
hidden_units=64

train_ngram_model(train_vectors, train_labels, test_vectors, test_labels,
                  num_classes=3,
                  learning_rate=1e-3,
                  epochs=1000,
                  batch_size=128,
                  layers=2,
                  units=hidden_units,
                  dropout_rate=0.2)