In [15]:
from os import listdir
import time
import random
import string
import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize 
from sklearn.feature_extraction.text import CountVectorizer, HashingVectorizer, TfidfVectorizer
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.naive_bayes import MultinomialNB
from sklearn.ensemble import RandomForestClassifier
from sklearn import svm
import pandas as pd

nltk.download('stopwords')
nltk.download('punkt')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\isacm\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\isacm\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [16]:
exclude = set(string.punctuation)
negation = set(["aren't", "isn't", "wasn't", "weren't", "can't", "couldn't", 
    "mustn't", "shouldn't", "won't", "wouldn't", "didn't", "doesn't", "don't", 
    "hasn't", "haven't", "hadn't", "not"])
stop_words = set(stopwords.words('english'))

def remove_punctuation(str_to_alter):
    """
    Takes a string as input and removes punctuation
    """
    global exclude
    new_string = ''.join(ch for ch in str_to_alter if ch not in exclude)

    return new_string

def has_digits(s):
    """
    Returns true if string has digits.
    """
    return any(ch.isdigit() for ch in s)

def is_negation(s):
    """
    Returns True if word will negate coming word 
    """
    global negation

    return s in negation

def is_stopword(s):
    """
    Returns True if the word is in NLTK's list of stop words.
    """
    global stop_words

    return s in stop_words

class Sample:
    """
    Class for holding samples from dataset
    """
    def __init__(self, body, target_class, preprocess=True):
        """
        Inits and preprocesses samples
        """
        self.original = body
        self.body = body
        self.length = len(body)
        self.target_class = int(target_class)
        self.exclamation_point_count = self.body.count('!')
        self.question_mark_count = self.body.count('?')
        self.tokens = list()

        word_tokens = word_tokenize(self.body) 
        
        if preprocess:

            ps =PorterStemmer()
            negation = False
            for w in word_tokens:
                if 'http' in w or '//' in w or '.com' in w:
                    negation = not negation
                    continue
                if '&' in w or '#' in w:
                    negation = not negation
                    continue
                root_word=ps.stem(w)
                if root_word == "n't" or root_word == 'not':
                    negation = not negation
                    continue
                root_word = remove_punctuation(root_word)
                if root_word == 'br':
                    negation = False
                    continue
                elif len(root_word) < 2:
                    negation = False
                    continue
                elif has_digits(root_word):
                    negation = False
                    continue
                elif is_stopword(root_word):
                    negation = False
                    continue
                if negation: 
                    root_word = "".join(('!',root_word))
                    negation = False
                self.tokens.append(root_word)

            self.body = " ".join(self.tokens)


    def __str__(self) -> str:
        """
        Prints first 100 characters of the body if object is printed
        """
        return self.body if len(self.body) < 100 else self.body[:100]

In [17]:
# Path to folder with dataset
path = "./data/processed/"

# Lists all files in folder 
files = listdir(path)[:3]

df = []

for file in files:
    # Read dataset into dataframe and append to list
    df.append((file, pd.read_csv(f"{path}{file}", sep="\t")))

# For holding dataset with preprocessing
file_content = []
# For holding dataset without preprocessing
file_content_original = []
# For training
ds = []
# For testing
ts = []

random.seed(0)

test_set_probability = .25

for file, dataset in df:
    y_train = []
    y_test = []
    x_train_raw = []
    x_test_raw = []
    for b, s in zip(dataset['review_body'], dataset['star_rating']):
        if random.random() <= test_set_probability:
            # if true add to test set
            train = True
        else:
            train=False
        # Create object with preprocessing from review
        x = Sample(b, 1 if int(s) == 5 else 0)
        if train:
            ds.append(x)
            x_train_raw.append(x.body.strip())
            y_train.append(x.target_class)
        else:
            ts.append(x)
            x_test_raw.append(x.body.strip())
            y_test.append(x.target_class)
    # Append to list
    file_content.append((file, ds, x_train_raw, y_train, ts, x_test_raw, y_test))
    y_train = []
    y_test = []
    x_train_raw = []
    x_test_raw = []
    for b, s in zip(dataset['review_body'], dataset['star_rating']):
        if random.random() <= .25:
            train = True
        else:
            train=False
        # Create object without preprocessing from review
        x = Sample(b, 1 if int(s) == 5 else 0, False)
        if train:
            x_train_raw.append(x.original.strip())
            y_train.append(x.target_class)
        else:
            x_test_raw.append(x.original.strip())
            y_test.append(x.target_class)
    file_content_original.append((file, ds, x_train_raw, y_train, ts, x_test_raw, y_test))

In [18]:
# Read tweets into dataframe
df = pd.read_csv(f"./data/tweets.csv")

x_tweets = []
y_tweets = []

random.seed(0)
# Create dataset for testing
for b, s in zip(df['text'], df['sentiment']):
    if not isinstance(b, str):
        break
    if s == 'neutral':
        continue
    x = Sample(b, 1 if s == 'positive' else 0)
    x_tweets.append(x.body.strip())
    y_tweets.append(x.target_class)

    

In [19]:


class QuestionMarkExtractor(BaseEstimator, TransformerMixin):
    """Takes in dataframe, extracts number of question marks and outputs a new dataframe"""

    def __init__(self):
        pass

    def transform(self, df, y=None):
        """The workhorse of this feature extractor"""
        return pd.DataFrame([x.count('?') for x in df])

    def fit(self, df, y=None):
        """Returns `self` unless something different happens in train and test"""
        return self

In [20]:
def eval_result(predicted, y):
    """
    Takes predicted class and target class and returns a dict containing 
    evaluation statistics"""
    
    tp, tn, fp, fn = 0,0,0,0
    stat = dict()

    for y_hat, y_ in zip(predicted, y):
        if y_hat == 1:
            if y_ == 1:
                tp +=1
            else:
                fp += 1
        else:
            if y_ == 1:
                fn += 1
            else:
                tn +=1
    # (TP+TN)/(TP+TN+FP+FN)
    accuracy = (tp+tn)/(tp+fp+fn+tn)
    # TP/(TN+FN)
    precision = tp/(tp+fp)
    # TP/(TP+FN)
    recall = tp/(tp+fn)
    # 2*recall*precision/(recall+precision)
    f1 = 2*(recall * precision) / (recall + precision)

    stat["Accuracy"] = accuracy
    stat["Precision"] = precision
    stat["Recall"] = recall
    stat["F1-Score"] = f1
    stat["TP"] = tp
    stat["TN"] = tn
    stat["FP"] = fp
    stat["FN"] = fn

    return stat

In [21]:
def get_model(model, vectorizer, qm, binary, stop_words, additional_settings, ngrams):
    """
    Takes arguments on what model, vectorizer and settings should be used. 
    Outputs a pipeline with feature extractors and ml algorithm 
    """

    # ML model to use
    if model == 'Naïve Bayes':
        m = MultinomialNB()
    elif model == "Support Vector Machine":
        if additional_settings:
            m = svm.SVC(kernel='linear', verbose=False, C=0.67)
        else:
            m = svm.SVC()
    elif model == 'Random Forest':
        if additional_settings:
            m = RandomForestClassifier(n_estimators=500 ,criterion='entropy', random_state=0, verbose=0, n_jobs=8)
        else:
            m = RandomForestClassifier(random_state=0, verbose=0, n_jobs=8)
    
    # Feature Extractor
    if vectorizer == 'CountVectorizer':
        vec = CountVectorizer(stop_words=stop_words, binary=binary, ngram_range=ngrams)
    elif vectorizer == "HashingVectorizer":
        vec = HashingVectorizer(stop_words=stop_words, binary=binary, ngram_range=ngrams)
    elif vectorizer == "TfidfVectorizer":
        if additional_settings:
            vec = TfidfVectorizer(stop_words=stop_words, binary=binary, smooth_idf=False, ngram_range=ngrams)
        else:
            vec = TfidfVectorizer(stop_words=stop_words, binary=binary, ngram_range=ngrams)

    feature_union = [('vec', vec)]
    # If question mark count should be considered as a feature.
    if qm:
        feature_union.append(('qm', QuestionMarkExtractor()))

    ppl = Pipeline([
        ('feats', FeatureUnion(feature_union)),
        ('model', m)
    ]) 

    return ppl

models = ['Support Vector Machine' , 'Naïve Bayes' , 'Random Forest']
vectorizers = ["TfidfVectorizer", 'CountVectorizer', "HashingVectorizer"]

try:
    # Getting the pipeline
    model = get_model("Support Vector Machine", "TfidfVectorizer", True, True, None, True, [1,2])
    t0 = time.time()
    # Fitting
    model.fit(file_content[2][2], file_content[2][3])
    t1 = time.time()
    # Predictions on testset with standard distribution (12.9% negative)
    y_hat = model.predict(file_content[0][5])
    t2 = time.time()
    fitting_time = t1-t0
    result = eval_result(y_hat, file_content[0][6])
    print(f"Accuracy with tests on standard distribution: {result['Accuracy']}")
    # Predictions on testset with same distribution like it has been fitted with.
    t1 = time.time()
    y_hat = model.predict(file_content[2][5])
    t2 = time.time()
    result = eval_result(y_hat, file_content[2][6])
    print(f"Accuracy with tests on distribution 87.1/12.9: {result['Accuracy']}")
    # Predictions on Twitter dataset
    t1 = time.time()
    y_hat = model.predict(x_tweets)
    t2 = time.time()
    result = eval_result(y_hat, y_tweets)
    print(f"Accuracy with tests on tweets: {result['Accuracy']}")
except Exception as e:
    print(e)

Accuracy with tests on standard distribution: 0.9403457952385263
Accuracy with tests on distribution 87.1/12.9: 0.8898235347901619
Accuracy with tests on tweets: 0.651615969581749
