In [None]:
# Imports
import os
import sys
import math
import time
import nltk
import numpy as np
from tqdm import tqdm
from joblib import dump
from typing import Callable
from nltk.tokenize import word_tokenize
from sklearn.naive_bayes import MultinomialNB
from nltk.sentiment.util import mark_negation
from sklearn.metrics import classification_report
from sklearn.model_selection import cross_validate, StratifiedKFold
# Local Imports
from src.utils import mr2str, get_movie_reviews_dataset, vconcat

### 3. Vocabulary Extraction

In [None]:
# PARAMS
TAGSET_LIST = ['VERB', 'NOUN', 'PRON', 'ADJ', 'ADV', 'ADP', 'CONJ', 'DET', 'NUM', 'PRT', 'X', '.']
SUBJ_THRESH = 0.5

# PATHS
PATH_TO_IMDB = 'data/rotten_imdb'

In [None]:
# Functions
def freq_list(corpus: list[str], return_vocab: bool = False):
    """Builds the frequency list of a corpus. Returns a dictionary
    where words are the keys and their frequency in the corpus is the respective value.

    If :param return_vocab: is True, the vocabulary of the corpus is returned alongside
    the frequency list."""
    out = {}
    for sent in corpus:
        tokens = word_tokenize(sent)
        for token in tokens:
            if token not in out.keys():
                out[token] = 0
            out[token] += 1
    if return_vocab:
        return out, set(list(out.keys()))
    return out


def doc_freq_list(corpus: list[str]) -> dict:
    """Builds the document-based frequency list of a corpus. Returns a dictionary
    where words are the keys and their document-frequency in the corpus is the respective value."""
    out = {}
    for i, sent in enumerate(corpus):
        tokens = word_tokenize(sent)
        for token in tokens:
            if token not in out.keys():
                out[token] = []
            out[token].append(i)
    for token in out.keys():
        out[token] = len(set(out[token]))
    return out


def load_rotten_imdb(path):
    subjective_sentences = "quote.tok.gt9.5000"
    objective_sentences = "plot.tok.gt9.5000"

    subj = []
    with open(os.path.join(path, subjective_sentences), 'r') as f:
        [subj.append(sent.strip()) for sent in f.readlines()]

    obj = []
    with open(os.path.join(path, objective_sentences), 'r') as f:
        [obj.append(sent.strip()) for sent in f.readlines()]

    return subj, obj


def filter_dict(fl, vocab):
    '''Removes entries from :param fl: frequency list not in :param vocab:.'''
    out = {}
    for k, v in fl.items():
        if k in vocab:
            out[k] = v
    return out


def tf(token, freq_list):
    return freq_list[token]


def idf(token, doc_freq_list, ndocs):
    return math.log((1+ndocs)/ 1 + doc_freq_list[token] ) + 1


def tfidf(token, freq_list, doc_freq_list, ndocs):
    return tf(token, freq_list) * idf(token, doc_freq_list, ndocs)


def normalize(vector):
    """Normalizes :param vector: with Euclidean Normalization."""
    denom = 0
    for item in vector:
        denom += item**2
    denom = math.sqrt(denom)
    for i in range(len(vector)):
        vector[i] /= denom
    return vector


def tfidf_dict(freq_list, doc_freq_list, ndocs, norm=True):
    """Builds a dict with the tfidf (Term-Frequency - Inverse-Document-Frequency) value for each
    word.  

    - :param fl: frequency list of the vocabulary;
    - :param dfl: document-based frequency list of the vocabulary;
    - :ndocs: total number of documents in the corpus;
    - :norm: decide whether to normalize the tfidf values using Euclidean Normalization or not.
    """
    words = set(freq_list.keys())
    out = {}
    for word in words:
        out[word] = tfidf(word, freq_list, doc_freq_list, ndocs)
    if norm:
        values = normalize(list(out.values()))
        return dict(zip(words, values))
    return out

###################### EMBEDDING MATRIX ######################
## Functions below are inspired by laboratories of the NLU  ##
##  course, held by Evgeny A. Stepanov.                     ##
##############################################################

def position_features(sentence: str, tokenizer: Callable, vocab: set) -> list:
    """Encodes the relative position of the tokens in :param sentence: as follows:

    - 0 for tokens at the beginning of the sentence; 
    - 1 for tokens in the middle of the sentence;
    - 2 for tokens at the end of the sentence.

    Tokens are extracted with the :param tokenizer: callable and filtered based on
    whether they appear in :param vocab: or not.
    """
    out = []
    tokens = tokenizer(sentence)
    for i, token in enumerate(tokens):
        if token not in vocab:
            continue
        if i == 0:
            out.append(0)
        elif i == len(tokens):
            out.append(2)
        else:
            out.append(1)
    return out


def part_of_speech_features(sentence: str, tokenizer: Callable, vocab: set) -> list:
    """Encode the pos tags of the tokens in :param sentence: in a vectorial representation,
    mapping the indices of the tags from the Universal Tagset. :param sentence: is split with the  :param tokenizer:
    callable; resulting tokens not in :param vocab: are filtered out before applying the transformation."""
    tokens = tokenizer(sentence)
    tagged_tokens = nltk.pos_tag(tokens, tagset="universal")
    ret = []
    for tok, tag in tagged_tokens:
        if tok in vocab:
            ret.append(TAGSET_LIST.index(tag))
    return ret


def negation_feature(sent: str, tokenizer: Callable, vocab: set) -> list:
    """Encodes :param sent: extracting the Negation Feature. The Negation Feature is
    defined as a vector where a 1 indicates a token being part of a negated phrase and 0 viceversa.

    Tokens are extracted with the :param tokenizer: callable and filtered out based on their appearance
    in :param vocab: before transforming the sentence."""
    tokens = tokenizer(sent)
    valid_tokens = []
    for t in tokens:
        if t in vocab:
            valid_tokens.append(t)
    marked_sent = mark_negation(valid_tokens)
    return [1 if t.endswith("_NEG") else 0 for t in marked_sent]


def embed_sentence(sent, tokenizer, vocabulary, tfidf_map):
    """Encodes a sentence extracting  a subset of token-level features 
    w.r.t. the ones proposed in https://arxiv.org/pdf/1312.6962.pdf.

    The features for each token (extracted with the :param tokenizer: callable) of :param sent: are:  
    - its tfidf feature (using :param tfidf_map:);
    - its positional feature;
    - its part_of_speech feature:
    - its negation feature.

    Thus, a matrix of shape (N_tokens, 4) is returned.
    """
    tokens = tokenizer(sent)
    tfidf_feats = []
    position_feats = position_features(sent, tokenizer, vocabulary)
    part_of_speech_feats = part_of_speech_features(sent, tokenizer, vocabulary)
    negation_feats = negation_feature(sent, tokenizer, vocabulary)
    for token in tokens:
        if token in vocabulary:
            tfidf_feats.append(tfidf_map.get(token))

    tfidf_feats = np.expand_dims(np.array(tfidf_feats), axis=-1)
    position_feats = np.expand_dims(np.array(position_feats), axis=-1)
    part_of_speech_feats = np.expand_dims(
        np.array(part_of_speech_feats), axis=-1)
    negation_feats = np.expand_dims(np.array(negation_feats), axis=-1)

    X = np.concatenate((tfidf_feats, part_of_speech_feats,
                       position_feats, negation_feats), axis=1)
    return X


def token_count(ds, tokenizer, vocab):
    count = 0
    for sent in ds:
        tokens = tokenizer(sent)
        for t in tokens:
            if t in vocab:
                count += 1
    return count


### CLASSIFICATION ###
def classify_sentence(clf, sent, subjectivity_thresh, tokenizer, vocabulary, tfidf_map):
    """Performs token-level subjectivity detection on the tokens in :param sent:, then aggregates
    the results for sentence-level classification. If the percentage of subjective tokens exceeds
    :param subjectivity_thresh:, then :param sent: is classified as subjective (objective otherwise)."""
    X = embed_sentence(sent, tokenizer, vocabulary, tfidf_map)
    y = clf.predict(X)
    if np.count_nonzero(y) >= int(len(y)*subjectivity_thresh):
        return 1
    return 0

### 3.1. Movie Reviews Dataset
Here we Load Movie Reviews Dataset and perform vocabulary extraction.

In [None]:
pos, neg = get_movie_reviews_dataset(mark_negs=False)
pos = mr2str(pos)
neg = mr2str(neg)
movie_reviews_ds = neg + pos
_, vocab = freq_list(movie_reviews_ds, return_vocab=True)
#len(vocab)

### 3.2. Rotten IMDB Dataset

In [None]:
subj, obj = load_rotten_imdb(PATH_TO_IMDB)
imdb_ds = subj + obj
#imdb_ds

Define Unigram Values

In [None]:
freqency_list = filter_dict(freq_list(imdb_ds), vocab)
document_freq_list = filter_dict(doc_freq_list(imdb_ds), vocab)
n_docs = len(movie_reviews_ds)
tfidf_map = tfidf_dict(freqency_list, document_freq_list, n_docs)
#tfidf_map

Build Embedding Matrix

In [None]:
X = None
for sent in tqdm(imdb_ds):
    embedded_sent_matrix = embed_sentence( sent, word_tokenize, vocab, tfidf_map )
    if X is None:
        X = embedded_sent_matrix
    else:
        X = vconcat(X, embedded_sent_matrix)
labels = [1] * token_count(subj, word_tokenize, vocab) + [0] * token_count(obj, word_tokenize, vocab)
#len(labels)

### 4. N-Fold Cross Validation
- At this step we perform cross validation and grab the best estimator

In [None]:
clf = MultinomialNB()
scores = cross_validate(
    clf, X, labels,
    cv = StratifiedKFold(n_splits=10),
    scoring = ['f1_micro'],
    return_estimator = True,
    n_jobs = -1)

average = sum(scores['test_f1_micro']) / len(scores['test_f1_micro'])
print("F1 Score: {:.3f}".format(average))

estimator = scores['test_f1_micro'][np.argmax(np.array(scores['test_f1_micro']))]


- Getting predictions using best model

In [None]:
y = [1]*len(subj) + [0]*len(obj)
y_hat = [classify_sentence(estimator, sent, SUBJ_THRESH,
                                word_tokenize, vocab, tfidf_map) for sent in imdb_ds]
print(classification_report(y, y_hat))

- save best model to models folder

In [None]:
# save estimator
path_to_estimator = f'tmp/models/token_level_subj_det.joblib'
if not os.path.exists(os.path.dirname(path_to_estimator)):
    os.makedirs(os.path.dirname(path_to_estimator))
print("Saving estimator at: ", path_to_estimator)
dump(estimator, path_to_estimator)