Sentiment Analyzer - Source Code - Emily Mech

This is the source code for the baseline and improved sentiment analyzers for the Computational Linguistics term project.

In [1]:
import numpy as np
import pandas as pd
import string
import spacy

from pathlib import Path
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import KFold, cross_validate
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score

In [3]:
class Preprocess:
    def __init__(self):
        self.cwd = None
        self.src = None
        self.data = None
        self.neg = None
        self.pos = None
        self.path_list = None
        self.valence_path = None
        self.df = None

    def get_paths(self):
        self.cwd = Path.cwd()
        self.src = self.cwd.parent
        self.neg = self.src / "Data" / "review_polarity" / "txt_sentoken" / "neg"
        self.pos = self.src / "Data" / "review_polarity" / "txt_sentoken" / "pos"
        self.path_list = [self.neg, self.pos]
        self.valence_path = self.src / "Data" / "NRC-Sentiment-Emotion-Lexicons" / "NRC-Sentiment-Emotion-Lexicons" / "NRC-VAD-Lexicon" / "NRC-VAD-Lexicon" / "v-scores.txt" 
        
    def read_data(self, path):
        with open(path, "r") as f:
            file = f.read()
        return file
    
    def get_label(self, category):
        if category.name == "neg":
            label = 0
        else:
            label = 1 
        return label
    
    def clean_tokens(self, file, stop=True, digit=True, punc=True, dash=True, stem=True):
        tokens = file.split(' ')
        tokens = [tok.strip('\n') for tok in tokens]
        tokens = [tok.lower() for tok in tokens]
        
        if dash:
            tokens = [tok.strip('--') for tok in tokens]
        if digit:     
            tokens = [tok for tok in tokens if not tok.isnumeric()]
        if punc:
            punct = set(string.punctuation)
            tokens = [tok for tok in tokens if tok not in punct]
        if stop:
            stop_words = set(stopwords.words('english')) 
            whitelist = ['not', "n't", "no"]
            tokens = [w for w in tokens if (w not in stop_words or w in whitelist) and len(w) > 1] 
        if stem:
            porter = PorterStemmer()
            tokens = [porter.stem(word) for word in tokens]
        
        tokens = [tok for tok in tokens if tok]
        join_tokens = ' '.join(tokens)
        
        return tokens, join_tokens    

    def make_df(self, stop, digit, punc, dash, stem):
        dict_list = []
        header_list = ["word", "rating"]
        v_scores = pd.read_csv(self.valence_path, delimiter = "\t", names = header_list)
        for path_category in self.path_list:
            label = self.get_label(path_category)
            files = path_category.glob("**/*")
            for file in files:
                text_label_dict = {}
                f = self.read_data(file)
                clean_tokens, joined_clean_tokens = self.clean_tokens(f, stop, digit, punc, dash, stem)
                token_ratings = v_scores.loc[v_scores['word'].isin(clean_tokens)] 
                pos_count = len(token_ratings[token_ratings['rating'] > .5].sum(axis=1))
                neg_count = len(token_ratings[token_ratings['rating'] < .5].sum(axis=1))
                avg_rating = token_ratings["rating"].mean()
                text_label_dict["label"] = label
                text_label_dict["text"] = f
                text_label_dict["clean_text"] = joined_clean_tokens
                text_label_dict["pos_count"] = str(pos_count)
                text_label_dict["neg_count"] = str(neg_count)
                text_label_dict["avg_rating"] = str(avg_rating)
                dict_list.append(text_label_dict)
            self.df = pd.DataFrame(dict_list)
            
            
class Classify(Preprocess):
    def __init__(self):
        super().__init__()
        Preprocess.get_paths(self)
        Preprocess.make_df(self, stop=True, digit=True, punc=True, dash=True, stem=True)  # to modify text representations, change any of the desired arguments to False
        self.output_dict_list = []
        
    def vectorize_input(self, preprocess_type, vectorizer_type, feature_type):
        if vectorizer_type == "tfidf":
            vect = TfidfVectorizer()
        elif vectorizer_type == "count":
            vect = CountVectorizer()
            
        if preprocess_type == "raw":
            col_name = "text"
        elif preprocess_type == "cleaned":
            col_name = "clean_text"
        
        if feature_type == "improved":
            column_trans = ColumnTransformer([('positive', CountVectorizer(lowercase=False),'pos_count'),
                                              ('negative', CountVectorizer(lowercase=False), 'neg_count'), 
                                              ('average', CountVectorizer(lowercase=False), 'avg_rating'),
                                              ('text', vect, col_name)], remainder='drop')
            
            features = column_trans.fit_transform(self.df)
        
        elif feature_type == "intermediate":
            column_trans = ColumnTransformer([('positive', CountVectorizer(lowercase=False),'pos_count'),
                                              ('negative', CountVectorizer(lowercase=False), 'neg_count'),
                                              ('text', vect, col_name)], remainder='drop')
            
            features = column_trans.fit_transform(self.df)
            
        else:
            features = vect.fit_transform(self.df[col_name])

        return features
    
    def buildClassifiers(self, clf, X_train, X_test, y_train, y_test, features):
        clf.fit(X_train.todense(), y_train)
        y_pred = clf.predict(X_test.todense())

        f1 = f1_score(y_test, y_pred, average="weighted", zero_division=0)
        precision = precision_score(y_test, y_pred, average="macro", zero_division=0)
        recall = recall_score(y_test, y_pred, average="macro", zero_division=0)
        accuracy = accuracy_score(y_test, y_pred)

        return f1, precision, recall, accuracy
    
    # Construct the classifiers at hand prior to folding the data through them
    def run_classifiers(self):
        text_types = ['raw', 'cleaned']
        vect_types = ['tfidf', 'count']
        feature_types = ['baseline', 'improved', 'intermediate']
        
        names = ['Logistic_Regression', 'Decision_Tree', "Neural_Network"]
        classifiers = [LogisticRegression(random_state=0, max_iter=1000),
                       DecisionTreeClassifier(random_state=0),
                       MLPClassifier(solver='lbfgs', alpha=1e-5, hidden_layer_sizes=(5, 2), random_state=1, max_iter=1000)]
        
        for text_type in text_types:
            for vect_type in vect_types:
                for feature_type in feature_types:
                    for name, clf in zip(names, classifiers):
                        output_dict = {}
                        features = self.vectorize_input(text_type, vect_type, feature_type)
                        f1_list = []
                        precision_list = []
                        recall_list = []
                        accuracy_list = []

                        print("Now classifying...")
                        print(f"classifier:{name}, text:{text_type}, vect:{vect_type}, feature:{feature_type}")

                        # Fold the data 5 times
                        kf = KFold(n_splits = 5)
                        foldCounter = 0

                        for train_index, test_index in kf.split(features):
                            X_train, X_test = features[train_index], features[test_index]
                            y_train, y_test = self.df['label'][train_index], self.df['label'][test_index]

                            f1, precision, recall, accuracy = self.buildClassifiers(clf, X_train, X_test, y_train, y_test, features)
                            f1_list.append(f1)
                            precision_list.append(precision)
                            recall_list.append(recall)
                            accuracy_list.append(accuracy)

                        mean_f1 = np.mean(f1_list)
                        mean_precision = np.mean(precision_list)
                        mean_recall = np.mean(recall_list)
                        mean_accuracy = np.mean(accuracy_list)
                        
                        print("\tAverage Macro F1 for {}:\t\t".format(name), mean_f1)
                        print("\tAverage Macro Precision for {}:\t".format(name), mean_precision)
                        print("\tAverage Macro Recall for {}:\t\t".format(name), mean_recall)

                        output_dict['clf'] = name
                        output_dict['text'] = text_type
                        output_dict['vect'] = vect_type
                        output_dict['feature'] = feature_type
                        output_dict['f1'] = mean_f1
                        output_dict['precision'] = mean_precision
                        output_dict['recall'] = mean_recall
                        output_dict['accuracy'] = mean_accuracy
                        self.output_dict_list.append(output_dict)
        
        return self.output_dict_list
                 
    def save_output(self, output_dict_list):
        output_df = pd.DataFrame(output_dict_list)
        result_path = self.src / "Results" / "results.csv"
        output_df.to_csv(result_path)