In [None]:
# System and generic utilities
import os
import sys
import random
import re
import string
import unicodedata
import pickle
from time import time
from pprint import pprint
from collections import Counter

# Pandas, numpy, matplotlib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# Scikit-learn - import feature engineering and classification learners
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC, SVC
from sklearn.ensemble import RandomForestClassifier

# Scikit-learn - import utilities
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, roc_curve, classification_report, auc, precision_recall_curve
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score, log_loss, f1_score
from sklearn.metrics import ConfusionMatrixDisplay, RocCurveDisplay, PrecisionRecallDisplay

# Optional - LightGBM
import lightgbm as lgb

In [None]:
# Dataset file location
INDIR  = './sentiment140/'
INDATA = 'training.1600000.processed.noemoticon.csv'

infile = os.path.join(INDIR, INDATA)

## Read data and inspect label distribution

In [None]:
df = pd.read_csv(infile, sep=',', header=None, encoding='ISO-8859-1')
df.columns = ["target", "ids", "date", "flag", "user", "text"]
print("Dataset size:", len(df))

In [None]:
df.target.value_counts()

In [None]:
# Map target label 4 to 1 for binary classification
#df.target[df.target == 4] = 1
#df.target = df.target.astype(str)
df.loc[df.target == 4, 'target'] = 1

target_cnt = Counter(df.target)
target_cnt = {str(k):v for k, v in target_cnt.items()}

plt.figure(figsize=(8, 4))
plt.bar(target_cnt.keys(), target_cnt.values())
plt.title("Dataset labels distribuition")

## Extract a sample for experimentation

In [None]:
# Get a 10% random sample
sample_size = 0.1
df2 = df.sample(frac=sample_size)
print("Sample size:", len(df2))
df2.target.value_counts()

## Explore the data sample

In [None]:
df2.head(10)

In [None]:
df2.info()

In [None]:
pprint(df2.text.sample(25).values)

In [None]:
from wordcloud import WordCloud

# Plot a cloud of words for negative tweets :
data_neg = df2[df2.target == 0]['text']
plt.figure(figsize=(20, 20))
wc = WordCloud(max_words=1000, width=1600, height=800,
               collocations=False).generate(" ".join(data_neg))
plt.imshow(wc)

In [None]:
# Plot a cloud of words for positive tweets :
data_pos = df2[df2.target == 1]['text']
plt.figure(figsize=(20, 20))
wc = WordCloud(max_words=1000, width=1600, height=800,
               collocations=False).generate(" ".join(data_pos))
plt.imshow(wc)

## Helper functions to clean text

In [None]:
# Helper functions to clean text
def deaccent(text):
    norm = unicodedata.normalize("NFD", text)
    result = ''.join(ch for ch in norm if unicodedata.category(ch) != 'Mn')
    return unicodedata.normalize("NFC", result)

def clean_text(text, lowercase=True, nopunct=True, deacc=True, nomention=True):
    # Convert to lowercase
    if lowercase:
        text = text.lower()
    
    # Deaccent text
    if deacc:
        text = deaccent(text)
              
    # Remove mentions
    if nomention:
        text = re.sub('@\w+ *', ' ', text)
    
    # Remove punctuation
    if nopunct:
        puncts = string.punctuation    # all special characters
        #puncts = string.punctuation.replace("'", "")   # keep apostrophe
        for c in puncts:
            text = text.replace(c, ' ')
        
    # Remove newlines - Compact and strip whitespaces
    text = re.sub('[\\r\\n]+', ' ', text)
    text = re.sub('\\s+', ' ', text)
    return text.strip()

In [None]:
text = "@nny24 Yeah!!! ^^ I got good news... but not GREAT  I hope to tell you the great news tonight @any123_456 =D. And... I'm sooo motivated girl... @whatever123"
clean_text(text)

## Split data (train/test)

In [None]:
# Split data and clean before training/testing
x_all = df2.text
y_all = df2.target

x_train, x_test, y_train, y_test = train_test_split(x_all, y_all, test_size=0.2, random_state=42)
x_train_clean = x_train.apply(lambda x: clean_text(x))
x_test_clean  = x_test.apply(lambda x: clean_text(x))

## Featurization: Explore word ngrams

In [None]:
def explore_features(vectorizer, documents, n_top=10):
    tfidf = vectorizer.fit_transform(documents)
    print('# of ngrams = %d\n' % len(vectorizer.vocabulary_))
    
    importance = np.argsort(np.asarray(tfidf.sum(axis=0)).ravel())[::-1]
    feature_names = vectorizer.get_feature_names_out()
    
    print('Most frequent ngrams in the vocabulary:')
    pprint(feature_names[importance[:n_top]])
    print()
    print('Least frequent ngrams in the vocabulary:')
    pprint(feature_names[importance[-n_top:]])

In [None]:
word_vect = TfidfVectorizer(ngram_range=(1, 1), analyzer="word", binary=False, sublinear_tf=False, 
                            min_df=3, max_df=1.0, stop_words=None)
explore_features(word_vect, x_train, n_top=100)

In [None]:
word_vect = TfidfVectorizer(ngram_range=(1, 1), analyzer="word", binary=False, sublinear_tf=False, 
                            min_df=3, max_df=1.0, stop_words='english',
                            strip_accents='ascii', lowercase=False)
explore_features(word_vect, x_train, n_top=100)

In [None]:
word_vect = TfidfVectorizer(ngram_range=(1, 1), analyzer="word", binary=False, sublinear_tf=False, 
                            min_df=3, max_df=1.0, stop_words='english',
                            strip_accents='ascii', lowercase=True)
explore_features(word_vect, x_train, n_top=100)

## Featurization: TFIDF using word and character ngrams

In [None]:
# Put it all together in a pipeline- Use both word and character ngram features
word_vect = TfidfVectorizer(ngram_range=(1, 2), analyzer="word", binary=False, sublinear_tf=False, 
                            min_df=3, max_df=1.0, stop_words='english',
                            #strip_accents='ascii', lowercase=True
                           )
char_vect = TfidfVectorizer(ngram_range=(1, 3), analyzer="char_wb", binary=False, sublinear_tf=False,
                            #strip_accents='ascii', lowercase=True
                           )
combined_vect = FeatureUnion([("word", word_vect), ("char", char_vect)])

print("Extracting features from the training data using a word+char ngrams vectorizer")
t0 = time()
X_train = combined_vect.fit_transform(x_train_clean)
duration = time() - t0
print("done in %fs" % duration)
print("n_samples: %d, n_features: %d" % X_train.shape)
print()

print("Extracting features from the test data using the same vectorizer")
t0 = time()
X_test = combined_vect.transform(x_test_clean)
duration = time() - t0
print("done in %fs" % duration)
print("n_samples: %d, n_features: %d" % X_test.shape)
print()

## Explore the combined vocabulary

In [None]:
# Explore word ngrams
print('# of word ngrams = %d' % len(word_vect.vocabulary_))
#sorted_word_vect = sorted(word_vect.vocabulary_.items(), key=lambda x: x[1], reverse=False)

pprint(list(word_vect.vocabulary_.items())[:100])

In [None]:
# Explore character ngrams
print('# of character ngrams = %d' % len(char_vect.vocabulary_))
#sorted_char_vect = sorted(char_vect.vocabulary_.items(), key=lambda x: x[1], reverse=True)

pprint(list(char_vect.vocabulary_.items())[:100])

## Training, evaluation and testing functions

In [None]:
def train(clf, train_x, train_y):
    print("Training: ")
    print(clf)
    t0 = time()
    clf.fit(train_x, train_y)
    train_time = time() - t0
    print("train time: %0.3fs" % train_time)
    return

def evaluate(model, test_x, y_true):
    t1     = time()
    y_pred = model.predict(test_x)
    t2     = time()
    duration = t2 - t1
    print('Evaluation time for %d comments = %.6f secs --> %.6f sec/tweet\n' % 
                                (len(y_pred), duration, duration/len(y_pred)))

    print("Classification report:")
    print(classification_report(y_true, y_pred))

    print('Performance metrics:')
    print('Accuracy   = %.6f' % accuracy_score(y_true, y_pred))
    print('AUC        = %.6f' % roc_auc_score(y_true, y_pred))
    print('Log-loss   = %.6f' % log_loss(y_true, y_pred))
    print('Precision  = %.6f' % precision_score(y_true, y_pred))
    print('Recall     = %.6f' % recall_score(y_true, y_pred))
    print('F1-Score   = %.6f' % f1_score(y_true, y_pred))

    print()
    print('Confusion matrix:')
    cm = confusion_matrix(y_true, y_pred, labels=model.classes_)
    cm_disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=model.classes_)
    cm_disp.plot()
    plt.show()

    print()
    print('ROC Curve:')
    fpr, tpr, thresholds = roc_curve(y_true, y_pred)
    roc_auc = auc(fpr, tpr)
    roc_disp = RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc,
                                  estimator_name=type(model).__name__)
    roc_disp.plot()
    plt.show()

    print()
    print('Precision-Recall curve:')
    precision, recall, _ = precision_recall_curve(y_true, y_pred)
    pr_disp = PrecisionRecallDisplay(precision=precision, recall=recall)
    pr_disp.plot()
    plt.show()
    return

def test_predict(model, text, prob=True):
    print('%s\n' % text)
    text = clean_text(text)
    tst = combined_vect.transform([text])
    preds = model.predict(tst)
    print('Predicted: ' + str(preds[0]))
    if prob:
        preds_proba = model.predict_proba(tst)
        print('Probas   : ' + str(preds_proba[0]))
    return

def test_byind(model, ind=None, prob=True):
    if ind is None:
        ind = int(random.random() * len(x_test))
    print('Test index: %d' % ind)
    tst = x_test.values[ind]
    lbl = y_test.values[ind]
    test_predict(model, tst, prob=prob)
    print('Truth    : %d' % lbl)

## Train and evaluate various learners

### Train a LogisticRegression classifier

In [None]:
lgr = LogisticRegression(penalty='l2', dual=False, tol=0.000001, C=10.0, fit_intercept=True, 
                         intercept_scaling=1, class_weight=None, random_state=None, 
                         solver='liblinear', max_iter=10000, verbose=1, warm_start=False, n_jobs=1)

train(lgr, X_train, y_train)
evaluate(lgr, X_test, y_test)

### Train a Linear SVM classifier

In [None]:
svm = LinearSVC(C=1.0, class_weight=None, dual=False, fit_intercept=True,
     intercept_scaling=1, loss='squared_hinge', max_iter=10000,
     multi_class='ovr', penalty='l1', random_state=None, tol=0.001,
     verbose=1)

train(svm, X_train, y_train)
evaluate(svm, X_test, y_test)

### Train a RandomForest classifier

In [None]:
# # Random forest training is very slow in this implementation
# rf = RandomForestClassifier(n_estimators=10, criterion='gini', max_depth=None, 
#                        min_samples_split=2, min_samples_leaf=1)

# train(rf, X_train, y_train)
# evaluate(rf, X_test, y_test)

### Train a LightGBM classifier

In [None]:
gbm = lgb.LGBMClassifier(objective='binary', 
                        num_leaves=31, 
                        learning_rate=0.05, 
                        n_estimators=100) 


train(gbm, X_train, y_train)
evaluate(gbm, X_test, y_test)

## Test a single tweet

In [None]:
test_predict(svm, 'I am so happy today', prob=False)

In [None]:
test_predict(lgr, 'I am so happy today')

In [None]:
test_predict(gbm, 'I am so happy today')

In [None]:
test_predict(lgr, 'I am so miserable today')

In [None]:
test_byind(lgr, 1)

## Test random tweets from test set

In [None]:
# Some random test cases
for i in range(20):
    test_byind(lgr)
    print('--------------------------')

In [None]:
tst = {
    0: [1, "@nny24 Yeah!!! ^^ I got good news... but not GREAT  I hope to tell you the great news tonight =D. And... I'm sooo motivated girl... "],
    1: [1, "@nissalomax hey nissa!  I'm ok...not GREAT...but not bad either "],
    2: [0, "I'm not happy"],
    3: [0, "I'm not happy at all"],
}

In [None]:
for i in tst.keys():
    print('Truth    : %d' % tst[i][0])
    test_predict(lgr, tst[i][1], prob=True)
    print('--------------------------')