In [2]:
import pandas as pd
import os
import time

from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN
from sklearn.metrics import accuracy_score

from gensim.scripts.glove2word2vec import glove2word2vec
from gensim.models import KeyedVectors, FastText
from nltk.tokenize import word_tokenize
import numpy as np

from azure.core.credentials import AzureKeyCredential
from azure.ai.textanalytics import TextAnalyticsClient

import matplotlib.pyplot as plt
from kmeans import KMeansClustering

### Data processing

- Loading the data -> `text` and `sentiment/type`.
- Splitting the data into `train` and `test` samples.

In [3]:
def load_data(data_file: str) -> tuple[list, list, list]:
    cwd = os.getcwd()
    file_path = os.path.join(cwd, "data", data_file)
    
    data = pd.read_csv(file_path)
    
    inputs = [el for el in data.iloc[:, 0]]
    outputs = [el for el in data.iloc[:, 1]]
    labels = list(set(outputs))
    
    return inputs, outputs, labels


def load_go_emotions(data_file: str) -> tuple[list, list, list]:
    cwd = os.getcwd()
    file_path = os.path.join(cwd, "data", data_file)
    
    data = pd.read_csv(file_path)
    emotion_cols = data.columns[9:]
    
    def get_emotion_from_multi_label(row):
        for c in emotion_cols:
            if row[c] == 1:
                return c
        return "neutral"
    
    data['emotion'] = data.apply(get_emotion_from_multi_label, axis=1)
    inputs = [el for el in data.iloc[:, 0]]
    outputs = [el for el in data.iloc[:, -1]]
    labels = list(set(outputs))
    
    return inputs, outputs, labels
    

def split_data(inputs: list, outputs: list) -> tuple[list, list, list, list]:
    X_train, X_test, y_train, y_test = train_test_split(inputs, outputs, test_size=0.2, random_state=42, shuffle=True)
    
    return X_train, X_test, y_train, y_test

### Word vectorizing

Vectorizing the passed sentences into vectors using:

- `Bag of Words` -> by `CountVectorizer`.
- `TF-IDF` -> by `TfidfVectorizer`.
- `Word2Vec` -> by `GoogleNews model`.

In [4]:
def bag_of_words(train: list, test: list):
    vectorizer = CountVectorizer()
    
    train_features = vectorizer.fit_transform(train)
    test_features = vectorizer.transform(test)
    
    # print("Vocabulary size: ", vectorizer.vocabulary_, " words")
    # print("Train size: ", len(train), " emails")
    # print("Train shape: ", train_features.shape)
    
    return train_features, test_features

def tf_idf(train: list, test: list):
    vectorizer = TfidfVectorizer(max_features=50)
    
    train_features = vectorizer.fit_transform(train)
    test_features = vectorizer.transform(test)
    
    # print("Vocabulary size: ", vectorizer.vocabulary_, " words")
    # print("Train size: ", len(train), " emails")
    # print("Train shape: ", train_features.shape)
    
    return train_features, test_features

def sentence_to_vec(sentence: str, model: KeyedVectors) -> np.array:
    words = word_tokenize(sentence)
    feature_vec = np.zeros((model.vector_size,), dtype="float32")
    no_words = 0
    
    for word in words:
        if word in model:
            no_words += 1
            feature_vec = np.add(feature_vec, model[word])
    
    if no_words > 0:
        feature_vec = np.divide(feature_vec, no_words)
    
    return feature_vec

def word2vec(train: list, test: list, model: KeyedVectors) -> tuple[list, list]:
    train_features = [sentence_to_vec(s, model) for s in train]
    test_features = [sentence_to_vec(s, model) for s in test]
    
    return train_features, test_features

def fast_text(train: list, test: list, model: FastText) -> tuple[list, list]:
    tr_norm = [model.wv[s] for s in train]
    ts_norm = [model.wv[s] for s in test]
    
    return tr_norm, ts_norm

### Classifying

Classifying sentences contained in a given file, along with the initial classification.

In [5]:
def kmeans_tool(train: list, test: list, labels: list) -> tuple[list, list]:
    classifier = KMeans(n_clusters=len(labels), init='k-means++', random_state=42)
    
    # Fitting the data.
    classifier.fit(train)
    
    # Predicting the outputs based on the test features.
    # Also showing the prediction for the train features.
    predictedIndexesTrain = classifier.predict(train)
    predictedIndexesTest = classifier.predict(test)
    predicted_train = [labels[value] for value in predictedIndexesTrain]
    predicted_test = [labels[value] for value in predictedIndexesTest]
    
    return predicted_train, predicted_test

def agglomerative_tool(train: list, test: list, labels: list) -> tuple[list, list]:
    classifier = AgglomerativeClustering(n_clusters=len(labels))
    
    # Fitting the data.
    classifier.fit(train)
    
    # Predicting the outputs based on the test features.
    # Also showing the prediction for the train features.
    predictedIndexesTrain = classifier.labels_
    predictedIndexesTest = classifier.fit_predict(test)
    predicted_train = [labels[value] for value in predictedIndexesTrain]
    predicted_test = [labels[value] for value in predictedIndexesTest]
    
    return predicted_train, predicted_test

def dbscan_tool(train: list, test: list, labels: list) -> tuple[list, list]:
    # Create an instance of HDBSCAN
    classifier = DBSCAN(eps=0.5, min_samples=5)
    
    # Fitting the data.
    classifier.fit(train)
    
    # Predicting the outputs based on the test features.
    # Also showing the prediction for the train features.
    predictedIndexesTrain = classifier.labels_
    predictedIndexesTest = classifier.fit_predict(test)
    predicted_train = [labels[value] for value in predictedIndexesTrain]
    predicted_test = [labels[value] for value in predictedIndexesTest]
    
    return predicted_train, predicted_test

def my_kmeans(train: list, test: list, labels: list) -> tuple[list, list]:
    classifier = KMeansClustering(k=len(labels))
    
    # Fitting the data.
    classifier.fit(train)
    
    # Predicting the outputs based on the test features.
    # Also showing the prediction for the train features.
    predictedIndexesTrain = classifier.predict(train)
    predictedIndexesTest = classifier.predict(test)
    predicted_train = [labels[value] for value in predictedIndexesTrain]
    predicted_test = [labels[value] for value in predictedIndexesTest]
    
    return predicted_train, predicted_test

### Visualizing and Scoring

Visualizing the predicted outputs with respect to the actual test outputs.
Scoring the predicted outputs using `sklearn`.

In [6]:
def visualize(test: list, predicted: list, actual: list) -> None:
    for i, sentence in enumerate(test):
        print(f"Sentence: {sentence}")
        print(f"Predicted: {predicted[i]} | Actual: {actual[i]}\n")
        
def score(predicted: list, actual: list):
    return accuracy_score(actual, predicted)

def plot_clusters(X: list, labels: list, kmeans: KMeansClustering) -> None:
    plt.scatter(X[:, 0], X[:, 1], c=labels)
    plt.scatter(kmeans.centroids[:, 0], kmeans.centroids[:, 1], c=range(len(kmeans.centroids)),
            marker="*", s=200)

    plt.show()

### Azure Language Model

Using the `Azure` language model to classify text emotions.

In [7]:
def get_client() -> TextAnalyticsClient:
    endpoint = os.environ["LANGUAGE_ENDPOINT"]
    key = os.environ["LANGUAGE_KEY"]
    
    credential = AzureKeyCredential(key)
    
    client = TextAnalyticsClient(endpoint=endpoint, credential=credential)
    return client

In [8]:
def classify_bag_of_words(file_name: str) -> None:
    inputs, outputs, labels = load_data(file_name)
    
    Xtr, Xts, ytr, yts = split_data(inputs, outputs)
    
    Xtr_norm, Xts_norm = bag_of_words(Xtr, Xts)
    
    start_time = time.time()
    predicted_train, predicted_test = kmeans_tool(Xtr_norm, Xts_norm, labels)
    end_time = time.time()
    
    print("Bag of Words\n\n")
    # visualize(Xtr, predicted_train, ytr)
    # print(f"Accuracy: {score(predicted_train, ytr)} on {len(Xtr)} train samples")
    
    visualize(Xts, predicted_test, yts)
    print(f"Accuracy: {score(predicted_test, yts)} on {len(Xts)} test samples in {end_time - start_time} ms")
    
def classify_tf_idf(file_name: str) -> None:
    inputs, outputs, labels = load_data(file_name)
    
    Xtr, Xts, ytr, yts = split_data(inputs, outputs)
    
    Xtr_norm, Xts_norm = tf_idf(Xtr, Xts)
    
    start_time = time.time()
    predicted_train, predicted_test = kmeans_tool(Xtr_norm, Xts_norm, labels)
    end_time = time.time()
    
    print("TF-IDF\n\n")
    # visualize(Xtr, predicted_train, ytr)
    # print(f"Accuracy: {score(predicted_train, ytr)} on {len(Xtr)} train samples")
    
    visualize(Xts, predicted_test, yts)
    print(f"Accuracy: {score(predicted_test, yts)} on {len(Xts)} test samples in {end_time - start_time} ms")
    
def classify_word2vec(file_name: str) -> None:
    inputs, outputs, labels = load_data(file_name)
    
    Xtr, Xts, ytr, yts = split_data(inputs, outputs)
    
    # Building the word2vec model.
    cwd = os.getcwd()
    modelPath = os.path.join(cwd, "models", "GoogleNews-vectors-negative300.bin")
    word2vec300Model = KeyedVectors.load_word2vec_format(modelPath, binary=True)
    
    Xtr_norm, Xts_norm = word2vec(Xtr, Xts, word2vec300Model)
    
    start_time = time.time()
    #predicted_train, predicted_test = kmeans_tool(Xtr_norm, Xts_norm, labels)
    predicted_train, predicted_test = my_kmeans(Xtr_norm, Xts_norm, labels)
    end_time = time.time()
    
    print("Word2Vec\n\n")
    # visualize(Xtr, predicted_train, ytr)
    # print(f"Accuracy: {score(predicted_train, ytr)} on {len(Xtr)} train samples")
    
    visualize(Xts, predicted_test, yts)
    print(f"Accuracy: {score(predicted_test, yts)} on {len(Xts)} test samples in {end_time - start_time} ms")
    
def classify_azure(file_name: str) -> None:
    inputs, outputs, labels = load_data(file_name)
    
    Xtr, Xts, ytr, yts = split_data(inputs, outputs)

    client = get_client()
    
    result_tr = client.analyze_sentiment(Xtr[:10], show_opinion_mining=True)
    docs_tr = [doc for doc in result_tr if not doc.is_error]
    
    # Showing train sentiments.
    for i, doc in enumerate(docs_tr):
        print(f"Text: {Xtr[i]}")
        print(f"Predicted: {doc.sentiment} | Actual: {ytr[i]}\n")
        
    
    result_ts = client.analyze_sentiment(Xts[:10], show_opinion_mining=True)
    docs_ts = [doc for doc in result_ts if not doc.is_error]
    
    # Showing test sentiments.
    for i, doc in enumerate(docs_ts):
        print(f"Text: {Xts[i]}")
        print(f"Predicted: {doc.sentiment} | Actual: {yts[i]}\n")
        
def classify_go_emotions(file_name: str) -> None:
    inputs, outputs, labels = load_go_emotions(file_name)
    
    Xtr, Xts, ytr, yts = split_data(inputs, outputs)
    
    Xtr_norm, Xts_norm = tf_idf(Xtr, Xts)
    
    start_time = time.time()
    predicted_train, predicted_test = kmeans_tool(Xtr_norm, Xts_norm, labels)
    end_time = time.time()
    
    print("TF-IDF\n\n")
    # visualize(Xtr, predicted_train, ytr)
    # print(f"Accuracy: {score(predicted_train, ytr)} on {len(Xtr)} train samples")
    
    visualize(Xts, predicted_test, yts)
    print(f"Accuracy: {score(predicted_test, yts)} on {len(Xts)} test samples in {end_time - start_time} ms")
    
def classify_fast_text(file_name: str) -> None:
    inputs, outputs, labels = load_data(file_name)
    
    Xtr, Xts, ytr, yts = split_data(inputs, outputs)
    
    # Building the FastText model.
    tokenized_sentences = [word_tokenize(s) for s in inputs]
    model = FastText(tokenized_sentences, vector_size=100, min_count=1, window=5, workers=8)
    
    Xtr_norm, Xts_norm = fast_text(Xtr, Xts, model)
    
    #predicted_train, predicted_test = kmeans_tool(Xtr_norm, Xts_norm, labels)
    start_time = time.time()
    predicted_train, predicted_test = my_kmeans(Xtr_norm, Xts_norm, labels)
    end_time = time.time()
    
    print("FastText\n\n")
    # visualize(Xtr, predicted_train, ytr)
    # print(f"Accuracy: {score(predicted_train, ytr)} on {len(Xtr)} train samples")
    
    visualize(Xts, predicted_test, yts)
    print(f"Accuracy: {score(predicted_test, yts)} on {len(Xts)} test samples in {end_time - start_time} ms")
    
def classify_glove(file_name: str) -> None:
    inputs, outputs, labels = load_data(file_name)
    
    Xtr, Xts, ytr, yts = split_data(inputs, outputs)
    
    # Building the word2vec model.
    cwd = os.getcwd()
    modelPath = os.path.join(cwd, "models", "glove.6B.300d.txt")
    word2vec_output = 'glove.6B.300d.txt.word2vec'
    
    glove2word2vec(modelPath, word2vec_output)
    word2vecModel = KeyedVectors.load_word2vec_format(word2vec_output, binary=True)
    
    Xtr_norm, Xts_norm = word2vec(Xtr, Xts, word2vecModel)
    
    start_time = time.time()
    #predicted_train, predicted_test = kmeans_tool(Xtr_norm, Xts_norm, labels)
    predicted_train, predicted_test = my_kmeans(Xtr_norm, Xts_norm, labels)
    end_time = time.time()
    
    print("GloVe\n\n")
    # visualize(Xtr, predicted_train, ytr)
    # print(f"Accuracy: {score(predicted_train, ytr)} on {len(Xtr)} train samples")
    
    visualize(Xts, predicted_test, yts)
    print(f"Accuracy: {score(predicted_test, yts)} on {len(Xts)} test samples in {end_time - start_time} ms")
    
def classify_agglomerative(file_name: str) -> None:
    inputs, outputs, labels = load_data(file_name)
    
    Xtr, Xts, ytr, yts = split_data(inputs, outputs)
    
    Xtr_norm, Xts_norm = bag_of_words(Xtr, Xts)
    
    start_time = time.time()
    predicted_train, predicted_test = agglomerative_tool(Xtr_norm, Xts_norm, labels)
    end_time = time.time()
    
    print("Agglomerative Clustering\n\n")
    # visualize(Xtr, predicted_train, ytr)
    # print(f"Accuracy: {score(predicted_train, ytr)} on {len(Xtr)} train samples")
    
    visualize(Xts, predicted_test, yts)
    print(f"Accuracy: {score(predicted_test, yts)} on {len(Xts)} test samples in {end_time - start_time} ms")
    
def classify_dbscan(file_name: str) -> None:
    inputs, outputs, labels = load_data(file_name)
    
    Xtr, Xts, ytr, yts = split_data(inputs, outputs)
    
    Xtr_norm, Xts_norm = bag_of_words(Xtr, Xts)
    
    start_time = time.time()
    predicted_train, predicted_test = dbscan_tool(Xtr_norm, Xts_norm, labels)
    end_time = time.time()
    
    print("DBSCAN\n\n")
    # visualize(Xtr, predicted_train, ytr)
    # print(f"Accuracy: {score(predicted_train, ytr)} on {len(Xtr)} train samples")
    
    visualize(Xts, predicted_test, yts)
    print(f"Accuracy: {score(predicted_test, yts)} on {len(Xts)} test samples in {end_time - start_time} ms")

In [11]:
file_spam = "spam.csv"
file_review = "reviews_mixed.csv"
file_go_emotions = "goemotions_3.csv"

#classify_bag_of_words(file_review)
#classify_tf_idf(file_spam)
#classify_word2vec(file_review)
#classify_azure(file_review)
#classify_go_emotions(file_go_emotions)
#classify_fast_text(file_review)
classify_glove(file_review)
#classify_agglomerative(file_review)
#classify_dbscan(file_review)

  glove2word2vec(modelPath, word2vec_output)
