In [245]:
""" 
    Basic feature extractor
"""
from operator import methodcaller
import string
from collections import Counter, defaultdict
import numpy as np

def tokenize(text):
    # TODO customize to your needs
    text = text.translate(str.maketrans({key: " {0} ".format(key) for key in string.punctuation}))
    return text.split()

class Features:

    def __init__(self, data_file):
        with open(data_file) as file:
            data = file.read().splitlines()

        data_split = map(methodcaller("rsplit", "\t", 1), data)
        texts, self.labels = map(list, zip(*data_split))

        self.tokenized_text = [tokenize(text) for text in texts]

        self.labelset = list(set(self.labels))

    @classmethod 
    def get_features(cls, tokenized, model):
        # TODO: implement this method by implementing different classes for different features 
        # Hint: try simple general lexical features first before moving to more resource intensive or dataset specific features 
        pass

In [246]:
################################
# Logistic Regression Features #
################################

class Features_LR(Features):

    def __init__(self, model_file, threshold=1):
        super(Features_LR, self).__init__(model_file)
        self.vocabulary = self.create_vocabulary(self.tokenized_text, threshold)
        self.word2index = {word: i for i, word in enumerate(self.vocabulary, start=0)}
        self.idf = None # Need to save IDF values for inference

    def read_inference_file(self, input_file):
        """Read inference file that is in the form: <text> i.e. a line
        of text that does not contain a tab.
        """
        with open(input_file) as file:
            data = file.read().splitlines()

        texts = data

        tokenized_text = [tokenize(text) for text in texts]
        return tokenized_text
    
    def create_vocabulary(self, tokenized_text, threshold):
        """Creat vocabulary from training set, considering only words
        that have an occurence > threshold.
        """
        # Append everything together in a dictionary
        flattened_list = [item for sublist in tokenized_text for item in sublist]
        flattened_list_count = Counter(flattened_list)

        # Sort the dictionary by values in descending order
        flattened_list_count = dict(sorted(flattened_list_count.items(), key=lambda item: item[1], reverse=True))

        # Considering only words that have an occurence > threshold.
        flattened_list_count_filter = [word for word, count in flattened_list_count.items() if count > threshold]

        return flattened_list_count_filter
    
    def get_features(self, tokenized_sentence, idf_array):
        """Convert sentence to TF-IDF space
        """
        size_vocabulary = len(self.vocabulary)
        n_documents = 1
        tf_array = np.zeros(size_vocabulary)
        words_per_document = 0
        # Compute Term-Frequency
        words_in_document = []
        for word in tokenized_sentence:
            index_word = self.word2index.get(word)
            if word in self.word2index.keys():
                tf_array[index_word] += 1
                words_per_document += 1
        tf = tf_array/words_per_document
        return tf*idf_array
        
    
    def tf_idf(self, tokenized_text): #max_features=1000):
        """Term frequency-inverse document frequency
        """
        size_vocabulary = len(self.vocabulary)
        n_documents = len(tokenized_text)
        tf_array = np.zeros((n_documents, size_vocabulary))
        idf_array = np.zeros(size_vocabulary) # Inverse Document Frequency
        words_per_document = np.zeros(n_documents)
        # Compute Term-Frequency
        for d_i, sentence in enumerate(tokenized_text, start=0):
            words_in_document = []
            for word in sentence:

                index_word = self.word2index.get(word)
                if word in self.word2index.keys():
                    tf_array[d_i][index_word] += 1
                    words_per_document[d_i] += 1
                    # Inverse Document Frequency
                    if word not in words_in_document: # does not count repeated words in the same document
                        words_in_document.append(word) 
                        idf_array[index_word] += 1 # number of documents containing the term
        tf = tf_array/words_per_document.reshape(-1, 1)
        # Smoothing: to avoid division by zero errors and to ensure that terms with zero document
        # frequency still get a non-zero IDF score
        idf = np.log((n_documents + 1)/(idf_array + 1)) + 1 # Smoothing

        self.idf = idf
        tf_idf = tf*idf
        return tf_idf # Shape (n_documents, vocabulary)

In [247]:
"""
 Refer to Chapter 5 for more details on how to implement a LogisticRegression
"""
from work.Model import *

class LogisticRegression(Model):
    def __init__(self, model_file, learning_rate=0.01, epochs=100):
        super(LogisticRegression, self).__init__(model_file)
        self.weights = None
        self.bias = None
        self.loss = []
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.Y_to_categorical = None # Map Y label to numerical

    def initialize_weights(self, num_features, num_labels):
        self.weights = np.zeros((num_features, num_labels))
        self.bias = np.zeros(num_labels)
        # np.random.seed(0)
        # self.weights = np.random.rand(num_features, num_labels)
        # self.bias = np.random.rand(num_labels)

    def softmax(self, Z):
        """Softmax function: normalizing logit scores
        :param Z([num_documents, num_labels])
        : return e^Z/sum_{i=0}^{k}{e^{Z}}
        """
        return np.exp(Z - np.max(Z, axis=1, keepdims=True))/np.sum(np.exp(Z), axis=1, keepdims=True)
        
    def predict_prob(self, X, weights, bias):
        """Return prediction of shape [num_documents, num_labels]
        """
        # z[num_documents, num_labels] = X[num_documents, num_features]*W[num_features, num_labels] + bias[num_labels]
        Z = np.dot(X, weights) + bias

        # Apply Softmax
        S = self.softmax(Z)
        return S

    def cross_entropy_loss(self, S, target):
        """Calculate the cross-entropy
        L = -1/n*_sum_{i=0}^{n}{y_i*log(s_i)} 
        y label is a vector containing K classes where yc = 1 if c is the correct class and the remaining elements will be 0.

        :param S[num_documents, num_labels]: probabilities of features after softmax
        :target [num_documents, num_labels]: target one hot encoded
        """
        return -np.mean(np.log(S)*target)

    def OneHot(self, targets, num_labels):
        """Convert arrary of targets to One Hot 
        :param targets([num_documents,])
        :param num_labels(int)
        :return Y[num_documents, num_labels]
        """
        Y_onehot = np.zeros((len(targets), num_labels))
        Y_onehot[np.arange(len(targets)), targets] = 1
        return Y_onehot
    
    def predict(self, X, weights, bias):
        """Return prediction of X with the categorical values]
        """
        # z[num_documents, num_labels] = X[num_documents, num_features]*W[num_features, num_labels] + bias[num_labels]
        Z = np.dot(X, weights) + bias

        # Apply Softmax
        S = self.softmax(Z)

        # Rows with highest probability
        S_max = np.argmax(S, axis=1)

        return S_max
    

    def train(self, input_file, verbose=False):
        """
        This method is used to train your models and generated for a given input_file a trained model
        :param input_file: path to training file with a text and a label per each line
        :return: model: trained model 
        """
        # Read dataset and create vocabulary
        features_lr_class = Features_LR(input_file)

        # Transform dataset to TF-IDF space
        # Return features with format (n_documents, size_vocabulary)
        X = features_lr_class.tf_idf(features_lr_class.tokenized_text)
        
        # Y
        Y_mapping = {label: index for index, label in enumerate(np.unique(features_lr_class.labels))}
        self.Y_to_categorical = {index: label for label, index in Y_mapping.items()} # dictionary to convert back y's to categorical
        Y = [Y_mapping[y] for y in features_lr_class.labels]

        # Initialize Weights
        sample_size = len(features_lr_class.tokenized_text)
        n_features = len(features_lr_class.vocabulary)
        num_labels = len(features_lr_class.labelset)
        self.initialize_weights(n_features, num_labels)

        # One Hot encoded Y
        Y_onehot = self.OneHot(Y, num_labels)

        for i in range(self.epochs):
            # Z = softmax(X*W + b)
            prob = self.predict_prob(X, self.weights, self.bias)
            
            # dL/dW
            grad_w = (1/sample_size)*np.dot(X.T, prob - Y_onehot)
            grad_b =  (1/sample_size)*np.sum(prob - Y_onehot, axis=0)

            # Updating weights and bias
            self.weights = self.weights - (self.learning_rate*grad_w)
            self.bias = self.bias - (self.learning_rate*grad_b)

            # Computing cross-entropy loss
            loss = self.cross_entropy_loss(prob, Y_onehot)
            self.loss.append(loss)

            if verbose:
                print(f"Epoch: {i+1} - Loss: {loss}")

        model = {
            "feature_weights": {
                "weights": self.weights,
                "bias": self.bias,
                "Y_to_categorical": self.Y_to_categorical
            },
            "Feature": features_lr_class
        }
        ## Save the model
        self.save_model(model)
        return X, Y_onehot, prob


    def classify(self, input_file, model):
        """
        This method will be called by us for the validation stage and or you can call it for evaluating your code 
        on your own splits on top of the training sets seen to you
        :param input_fixle: path to input file with a text per line without labels
        :param model: the pretrained model
        :return: predictions list
        """
        ## TODO write your code here (and change return)

        feature_weights = model["feature_weights"]
        Feature_LR_class = model["Feature"]

        # Read Input File
        tokenized_text = Feature_LR_class.read_inference_file(input_file)
        X = []

        # Get features from inference file
        for sentence in tokenized_text:
            # Transform dataset to TF-IDF space
            # Return features with format (1, size_vocabulary)
            X_sentence = Feature_LR_class.get_features(sentence, Feature_LR_class.idf)

            # Concatenate A and B vertically
            X.append(X_sentence)

        X = np.vstack(X)

        # Prediction
        preds_numerical = self.predict(X, feature_weights['weights'], feature_weights['bias'])
        # Map indexes to Categorical space
        preds_label = []
        for y in preds_numerical:
            tmp = feature_weights['Y_to_categorical'][y]
            preds_label.append(tmp)
        
        return preds_label


In [248]:
train_file = "work/datasets/4dim/train.txt"
pred_file = "work/datasets/4dim/val.test"
pred_true_labels = "work/datasets/4dim/val.txt"
model_file_name = "logreg.4dim.model"

In [249]:
model_LR = LogisticRegression("logreg.4dim.model")

In [250]:
X, Y, prob  = model_LR.train(train_file, verbose=True)

Epoch: 1 - Loss: 0.3465735902799726
Epoch: 2 - Loss: 0.3466025681359213
Epoch: 3 - Loss: 0.34663154967254756
Epoch: 4 - Loss: 0.3466605350724734
Epoch: 5 - Loss: 0.34668952451836266
Epoch: 6 - Loss: 0.3467185181929219
Epoch: 7 - Loss: 0.34674751627890105
Epoch: 8 - Loss: 0.34677651895909456
Epoch: 9 - Loss: 0.3468055264163416
Epoch: 10 - Loss: 0.34683453883352744
Epoch: 11 - Loss: 0.3468635563935838
Epoch: 12 - Loss: 0.34689257927948985
Epoch: 13 - Loss: 0.34692160767427294
Epoch: 14 - Loss: 0.34695064176100954
Epoch: 15 - Loss: 0.3469796817228256
Epoch: 16 - Loss: 0.3470087277428979
Epoch: 17 - Loss: 0.3470377800044545
Epoch: 18 - Loss: 0.3470668386907755
Epoch: 19 - Loss: 0.3470959039851938
Epoch: 20 - Loss: 0.34712497607109644
Epoch: 21 - Loss: 0.34715405513192477
Epoch: 22 - Loss: 0.34718314135117534
Epoch: 23 - Loss: 0.3472122349124008
Epoch: 24 - Loss: 0.34724133599921103
Epoch: 25 - Loss: 0.34727044479527325
Epoch: 26 - Loss: 0.34729956148431346
Epoch: 27 - Loss: 0.3473286862501

In [242]:
X.shape

(1248, 5253)

In [243]:
model_LR_loaded = model_LR.load_model()

In [244]:
model_LR_loaded['feature_weights']['weights']

array([[-1.15005661e-03,  5.14989670e-04, -1.30262516e-03,
         2.28707418e-03],
       [ 1.06360747e-03,  4.99647357e-05, -7.92836111e-04,
        -3.01790746e-05],
       [ 5.18194221e-04,  6.28149116e-05, -6.25831051e-04,
         2.53237055e-04],
       ...,
       [ 3.25096888e-05, -1.07517348e-05, -1.07111154e-05,
        -1.07776991e-05],
       [-3.73634824e-05, -7.44952571e-06,  8.28999616e-05,
        -3.72105150e-05],
       [-6.70726298e-06,  2.01832519e-05, -6.63441166e-06,
        -6.67601232e-06]])

In [211]:
preds = model_LR.classify(pred_file + ".txt", model_LR_loaded)

In [212]:
## Save the predictions: one label prediction per line
with open(pred_file + ".pred.txt", "w") as file:
    for pred in preds:
        file.write(pred+"\n")

# Evaluation

In [213]:
import pandas as pd

In [214]:
true_dataset = pd.read_csv(pred_true_labels, sep='\t', header=None, names=['text', 'true_label'])
pred_dataset = pd.read_csv(pred_file + ".pred.txt", sep='\t', header=None, names=['pred'])

In [215]:
# Check if the columns have the same name; adjust as needed
column_name = 'true_label'  # Change to the actual column name
pred_column_name = 'pred'  # Change to the actual predicted column name

# Merge the two DataFrames on a common index or key if available
merged_df = true_dataset.merge(pred_dataset, left_index=True, right_index=True)

# Calculate the accuracy by comparing the two columns
accuracy = (merged_df[column_name] == merged_df[pred_column_name]).mean()

# Print the accuracy as a percentage
print(f'Accuracy: {accuracy * 100:.2f}%')

Accuracy: 19.23%
