# ModelSelection.ipynb

### This notebook contains a K-Fold Cross Validation algorithm to test various models and hyperparameters for the supervised machine learning model used to label Tweets.

Author: Erik Puijk <br>
Date  : March 28, 2022

In [305]:
pip install -U scikit-learn

Requirement already up-to-date: scikit-learn in /home/erik/anaconda3/lib/python3.8/site-packages (1.1.1)
Note: you may need to restart the kernel to use updated packages.


In [306]:
import numpy as np
import json, csv, math, random
from sklearn.model_selection import train_test_split, KFold
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score, classification_report
from sklearn import svm
import pyperclip as pc
import matplotlib.pyplot as plt

In [307]:
Encoder = LabelEncoder()

In [308]:
def read_tweets(path):
    """ Read the Tweets from a given text file and return in JSON-format. """
    
    content = ""
    
    try:
        with open(path, 'r') as f:
            content = json.loads(f.read())
    except IOError:
        print("I/O error")
        
    return content

In [310]:
def calc_avg_scores(scores, cats, k):
    """ Calculate an average score given a list of scores. """
    
    scores_avg = []
    
    # If a list of lists of scores is given, calculate the average per label
    if isinstance(scores[0], list): 
        
        for i in range(len(cats)):

            total = 0

            for j in range(k):
                total += scores[j][i]

            scores_avg.append(total / k)
            
    # If a list of average (f1) scores is given, just return that list
    else:
        return scores

    return scores_avg

In [311]:
def keysort(x):
    """ Calculate the average F1 score from a list of F1 scores, which is used to sort the scores. """
    
    return sum(x[1]) / len(x[1])

In [312]:
def ml(X_train, y_train, X_test, v_type, alg, ngram_min, ngram_max, min_df, max_df, max_features):
    """ Vectorize training and test set according to specific vectorizer and fit and predict using a specific
        SVM. 
        
        Tested vectorizers:
        - CountVectorizer (sklearn)
        - TfidfVectorizer (sklearn)
        
        Tested SVMS:
        - SVC (sklearn)
        - LinearSVC (sklearn)"""
    
    # Select appropriate vectorizer
    if v_type == 'count':
        vectorizer = CountVectorizer(analyzer='word', ngram_range=(ngram_min, ngram_max), min_df=min_df, \
                                    max_df=max_df, max_features=max_features)
    else:
        vectorizer = TfidfVectorizer(ngram_range=(ngram_min, ngram_max), min_df=min_df, max_df=max_df, \
                                    max_features=max_features, sublinear_tf=True)

    # Fit and transform tokens
    X_train = vectorizer.fit_transform(X_train)
    X_test = vectorizer.transform(X_test)

    # Create and fit SVM
    if alg == 'svc':
        clf = svm.SVC(kernel='linear', C=0.6, class_weight='balanced')
    else:
        clf = svm.LinearSVC(C=0.5, class_weight='balanced', random_state=0)
    
    # Predict labels and return prediction
    clf.fit(X_train, y_train)
    #plot_coefficients(clf, vectorizer.get_feature_names_out())
    
    return clf.predict(X_test)

In [313]:
def cross_val(v_type, X, y, ngram_min, ngram_max, min_df, max_df, max_features, cats, alg):
    """ Run 3-fold cross validation on the training set to create a training set and a validation set used
        to test different vectorization and modelling parameters. """
    
    scores = []
    k = 3
        
    # Run cross-validation and calculate scores
    kf = KFold(n_splits=k)
    for train, valid in kf.split(X, y):
        X_train_t = [X[i] for i in train]
        y_train_t = [y[i] for i in train]
        X_valid_t = [X[i] for i in valid]
        y_valid_t = [y[i] for i in valid]
        
        pred = ml(X_train_t, y_train_t, X_valid_t, v_type, alg, ngram_min, ngram_max, min_df, max_df, max_features)
        
        scores.append(f1_score(y_valid_t, pred, average='micro', zero_division=0))
    
    # Return the configuration of the model together with the average of scores
    return [[ngram_min, ngram_max, min_df, max_df, max_features], calc_avg_scores(scores, cats, k)]

In [314]:
def plot_coefficients(classifier, feature_names, top_features=20):
    """ This function can be used to gain insight in the significance of the features to understand why
        certain model architectures work better than others.
        Code was obtained from https://aneesha.medium.com/3454ab18a14d"""
    
    # Select category ([0] is 'CAM' for content type)
    coef = classifier.coef_[0].ravel()
    top_positive_coefficients = np.argsort(coef)[-top_features:]
    top_negative_coefficients = np.argsort(coef)[:top_features]
    top_coefficients = np.hstack([top_negative_coefficients, top_positive_coefficients])
    
    # Create plot
    plt.figure(figsize=(15, 5))
    colors = ['red' if c < 0 else 'blue' for c in coef[top_coefficients]]
    plt.bar(np.arange(2 * top_features), coef[top_coefficients], color=colors)
    feature_names = np.array(feature_names)
    plt.xticks(np.arange(1, 1 + 2 * top_features), feature_names[top_coefficients], rotation=60, ha='right')
    plt.show()

In [315]:
def test_configurations(v_type, test_validation, tweets, labels, alg, rs):
    """ Test different configurations for different models and compare the micro-f1 scores to select the best
        model. """
    
    cats = list(set(labels))
    
    # Define most promosing model parameters for vectorization according to categorization type
    if len(cats) == 3:
        # Content
        ngram_mins = [1]
        ngram_maxs = [2, 3]
        min_dfs = [1]
        max_dfs = [0.1, 0.2, 0.4, 0.6, 0.8, 1.0]
        max_features = [450, 500, 1000, 2000, 3000, None]
    else:
        # Activation
        ngram_mins = [1]
        ngram_maxs = [2, 3]
        min_dfs = [1]
        max_dfs = [0.1, 0.2, 0.4, 0.6, 0.8, 1.0]
        max_features = [450, 500, 1000, 2000, 3000, None]
    
    # Split data set into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(tweets, labels, test_size=0.3, random_state=rs)
        
    # Encode labels
    y_train = Encoder.fit_transform(y_train)
    y_test = Encoder.fit_transform(y_test)
    
    scores = []
    i = 0
    i_max = len(ngram_mins) * len(ngram_maxs) * len(min_dfs) * len(max_dfs) * len(max_features)
    
    # Combine each parameter and run the model with cross validation
    for ngram_min in ngram_mins:
        for ngram_max in ngram_maxs:
            for min_df in min_dfs:
                for max_df in max_dfs:
                    for max_feature in max_features:
                        i += 1
                        
                        # Call the function for the model and append the score
                        scores.append(cross_val(v_type, X_train, y_train, ngram_min, ngram_max, min_df, \
                                                max_df, max_feature, cats, alg))
    
    # Sort the scores descending and print them
    scores.sort(key=keysort, reverse=True)
    
    prnt = ''
    
    if test_validation:
        # Select the best configurations
        best_score = keysort(scores[0])
        best_configs = []
        for score in scores:
            if keysort(score) == best_score:
                best_configs.append(score)

        # Use the best configurations to validate model on the test set
        for config, score in best_configs:
            prnt += (str([config, score]) + '\n')
            
            pred = ml(X_train, y_train, X_test, v_type, alg, config[0], config[1], config[2], config[3], config[4])

            prnt += (classification_report(y_test, pred, \
                                           target_names=Encoder.inverse_transform(list(set(y_test))), \
                                           zero_division=0))

    print(prnt)

In [None]:
# Read all Tweets
tweets = read_tweets('source/tweets_all_preprocessed_exc_stopwords.txt')

# Select gold-standard (labeled) Tweets
tweets_gs = [tweet for tweet in tweets if tweet['memo'] == 'gold_standard']

# Select the text from those Tweets
tweets_text = [tweet['text'] for tweet in tweets_gs]

# Select the labels from those Tweets
labels_con = [tweet['cat_con'] for tweet in tweets_gs]
labels_act = [tweet['cat_act'] for tweet in tweets_gs]

# Define number of different random states to test parameters on
i_max = 10

# For each random state, run the algorithm and see which parameter settings get the best results using K-fold
# cross validation
for i in range(0, i_max):
    print('RS=%s:' % (i))
    test_configurations('count', True, tweets_text, labels_con, 'linear_svc', i)
    #test_configurations('tfidf', True, tweets_text, labels_act, 'svc', i)