### 1. Importing Libraries

In [None]:
# importing libraries
# Modelling
import pickle
# Text cleaning
import re
import string
from collections import defaultdict
from typing import Any, List, Union

import matplotlib.pyplot as plt
import nltk
import numpy as np
import numpy.typing as npt
import pandas as pd
import seaborn as sns
from nltk.corpus import stopwords
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.stem import PorterStemmer, WordNetLemmatizer
from nltk.tokenize import sent_tokenize, word_tokenize
from rouge import Rouge
from sklearn import metrics
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import (GridSearchCV, RandomizedSearchCV,
                                     StratifiedKFold, train_test_split)
from sklearn.naive_bayes import MultinomialNB
from xgboost import XGBClassifier

from text_analytics.config import (ACRONYMS, DATA_PATH, RANDOM_STATE,
                                   RAW_DATA_PATH, SENTIMENT_CLEANED_DATA_PATH,
                                   SUMMARISER_CLEANED_DATA_PATH)

pd.set_option("display.max_colwidth", None)

## VADER

### 1. Reading in data

In [None]:
vader_movies = pd.read_parquet(RAW_DATA_PATH)
vader_movies.info()

### 2. Preprocessing function helpers

In [None]:
def convert_lowercase(series: Union[pd.Series, str]) -> Union[pd.Series, str]:
    if isinstance(series, str):
        return series.lower()
    return series.str.lower()


def remove_html_tags(series: Union[pd.Series, str]) -> Union[pd.Series, str]:
    if isinstance(series, str):
        return re.sub(pattern=r"<.*?>", repl="", string=series)

    return series.str.replace(pat=r"<.*?>", repl="", regex=True)


def remove_punctuation(word_arr: Union[pd.Series, str]) -> Union[pd.Series, str]:
    import string

    if isinstance(word_arr, str):
        return " ".join(word for word in word_arr if word not in string.punctuation)

    return word_arr.apply(
        lambda arr: [word for word in arr if word not in string.punctuation]
    )


def convert_abbreviations(series: Union[pd.Series, str]) -> Union[pd.Series, str]:

    if isinstance(series, str):
        return " ".join(
            ACRONYMS.get(word) if word in ACRONYMS.keys() else word
            for word in series.split()
        )

    return series.apply(
        lambda sentence: " ".join(
            ACRONYMS.get(word) if word in ACRONYMS.keys() else word
            for word in sentence.split()
        )
    )


def remove_stopwords(
    series: Union[pd.Series, str],
    stop_words: Union[nltk.corpus.reader.wordlist.WordListCorpusReader, List] = None,
) -> Union[pd.Series, str]:
    if stop_words is None:
        stop_words = set(stopwords.words("english"))

    if isinstance(series, str):
        return " ".join(word for word in series.split() if word not in stop_words)

    return series.apply(
        lambda sentence: " ".join(
            word for word in sentence.split() if word not in stop_words
        )
    )


def tokenize_words(text: str, tokenizer: str = "word") -> npt.ArrayLike:

    if tokenizer not in ("word", "sentence"):
        raise ValueError(f"{tokenizer} must be one of (word, sentence)")

    tokens = {"word": word_tokenize(text), "sentence": sent_tokenize(text)}
    try:
        return tokens.get(tokenizer)
    except BaseException as err:
        print(f"Unexpected err: {err}, Type: {type(err)}")
        raise


def remove_non_alnum(word_arr: Union[pd.Series, str]) -> Union[pd.Series, str]:
    if isinstance(word_arr, str):
        return " ".join(word for word in word_arr.split() if word.isalnum())
    return word_arr.apply(lambda arr: [word for word in arr if word.isalnum()])


def stemming(word_arr: npt.ArrayLike, stemmer: Any = None) -> npt.ArrayLike:
    if stemmer is None:
        stemmer = PorterStemmer()
    try:
        return [stemmer.stem(word) for word in word_arr]
    except BaseException as err:
        print(f"Unexpected err: {err}, Type: {type(err)}")
        raise


def lemmatizer(word_arr: npt.ArrayLike, lemmatizer: Any = None) -> npt.ArrayLike:
    if lemmatizer is None:
        lemmatizer = WordNetLemmatizer()
    try:
        return [lemmatizer.lemmatize(word) for word in word_arr]
    except BaseException as err:
        print(f"Unexpected err: {err}, Type: {type(err)}")
        raise

In [None]:
vader_movies["review"] = remove_html_tags(vader_movies["review"])
vader_movies["review"] = convert_lowercase(vader_movies["review"])
vader_movies["review"] = convert_abbreviations(vader_movies["review"])
vader_movies["review"] = remove_stopwords(vader_movies["review"])
vader_movies.head(1)

### 3. Define a Vader model class

In [None]:
class VaderReviews:
    def __init__(self, data: Union[str, pd.DataFrame]) -> None:

        try:
            self.model = SentimentIntensityAnalyzer()
        except BaseException:
            nltk.download("vader_lexicon")
            self.model = SentimentIntensityAnalyzer()

        self.data = data
        self.polarity_scores = self.compound_score = self.prediction = None

    def calculate_polarity_score(self) -> None:

        if isinstance(self.data, str):
            self.polarity_scores = self.model.polarity_scores(self.data)
        else:
            self.polarity_scores = self.data["review"].apply(
                lambda sentence: self.model.polarity_scores(sentence)
            )

    def extract_compound_score(self) -> None:
        if isinstance(self.data, str):
            self.compound_score = self.polarity_scores.get("compound")
        else:
            self.compound_score = self.polarity_scores.apply(
                lambda score: score.get("compound")
            )

    def extract_prediction(self) -> None:
        if isinstance(self.data, str):
            self.prediction = "positive" if self.compound_score > 0 else "negative"
        else:
            self.prediction = self.compound_score.apply(
                lambda c_score: "positive" if c_score > 0 else "negative"
            )

    def return_vader_scores(self) -> None:

        if self.polarity_scores is None:
            self.calculate_polarity_score()
        elif self.compound_score is None:
            self.extract_compound_score()
        elif self.prediction is None:
            self.extract_prediction()
        if isinstance(self.data, str):
            return (self.compound_score, self.prediction)

        self.result = pd.concat(
            [self.data, self.compound_score, self.prediction], axis="columns"
        )
        self.result.columns = ["review", "sentiment", "compound_score", "prediction"]

        print(self.result.head())

### 4. Run predictions 

In [None]:
vr = VaderReviews(data=vader_movies)

vr.calculate_polarity_score()
vr.extract_compound_score()
vr.extract_prediction()
vr.return_vader_scores()

In [None]:
print(confusion_matrix(vr.result["sentiment"], vr.result["prediction"]))
print(classification_report(vr.result["sentiment"], vr.result["prediction"]))

## Random Forest 

### 1. Reading in data

In [None]:
movies_raw = pd.read_parquet(RAW_DATA_PATH)
movies_raw.info()

### 2. Preprocessing

In [None]:
def sentiment_text_processing(series: pd.Series) -> pd.Series:

    series = remove_html_tags(series)
    series = convert_lowercase(series)
    series = convert_abbreviations(series)
    series = remove_stopwords(series)
    series = series.str.replace(pat=r"film|movie|[0-9]+", repl="", regex=True)
    series = series.apply(lambda sentence: tokenize_words(sentence, tokenizer="word"))
    series = remove_punctuation(series)
    series = series.apply(lambda arr: lemmatizer(arr))
    series = remove_non_alnum(series)

    return series

In [None]:
movies_cleaned = movies_raw.copy(deep=True)
movies_cleaned.drop_duplicates(inplace=True)
movies_cleaned["preprocessed_review"] = sentiment_text_processing(
    series=movies_cleaned["review"]
)
movies_cleaned["preprocessed_review"] = movies_cleaned["preprocessed_review"].astype(
    str
)
movies_cleaned["length"] = movies_cleaned["preprocessed_review"].apply(len)
movies_cleaned["class"] = np.where(movies_cleaned["sentiment"] == "positive", 1, 0)
movies_cleaned.drop(columns=["review", "sentiment"], inplace=True)

In [None]:
train, test = train_test_split(
    movies_cleaned,
    test_size=0.2,
    stratify=movies_cleaned["class"],
    random_state=RANDOM_STATE,
)

### 3.  

## Extractive Text Summariser 

### 1. Reading in data

In [None]:
movie_reviews = pd.read_parquet(SUMMARISER_CLEANED_DATA_PATH)
movie_reviews.head()

### 2. Define a ExtractiveTextSummarizer class 

In [None]:
class ExtractiveTextSummarizer:
    def __init__(self, article: Union[str, pd.DataFrame]) -> None:
        self.article = article
        self.frequency_table = defaultdict(int)

    def _create_dictionary_table(self, stemmer: Any = None) -> dict:

        # removing stop words
        stop_words = set(stopwords.words("english"))
        word_vector = word_tokenize(self.article)

        # instantiate the stemmer
        if stemmer is None:
            stemmer = PorterStemmer()

        stemmed_word_vector = [stemmer.stem(word) for word in word_vector]
        for word in stemmed_word_vector:
            if word not in stop_words:
                self.frequency_table[word] += 1

        return self.frequency_table

    def _calculate_sentence_scores(self, sentences: npt.ArrayLike) -> dict:

        # algorithm for scoring a sentence by its words
        sentence_weights = defaultdict(int)

        for sentence in sentences:
            sentence_wordcount_without_stop_words = 0

            for word_weight in self.frequency_table:
                sentence_weights[sentence[:7]] += self.frequency_table[word_weight]

                if word_weight in sentence.lower():
                    sentence_wordcount_without_stop_words += 1

            sentence_weights[sentence[:7]] /= sentence_wordcount_without_stop_words

        return sentence_weights

    def _calculate_threshold_score(self, sentence_weight: dict) -> float:
        return np.mean(list(sentence_weight.values()))

    def _get_article_summary(
        self, sentences: npt.ArrayLike, sentence_weights: dict, threshold: float
    ) -> str:
        article_summary = [
            sentence
            for sentence in sentences
            if sentence[:7] in sentence_weights
            and sentence_weights.get(sentence[:7]) >= threshold
        ]

        return " ".join(article_summary)

    def run_article_summary(self):

        # creating a dictionary for the word frequency table
        _ = self._create_dictionary_table()

        # tokenizing the sentences
        sentences = sent_tokenize(self.article)

        # algorithm for scoring a sentence by its words
        sentence_scores = self._calculate_sentence_scores(sentences)

        # getting the threshold
        threshold = self._calculate_threshold_score(sentence_scores)

        # producing the summary
        article_summary = self._get_article_summary(
            sentences, sentence_scores, 0.95 * threshold
        )

        return article_summary

    def get_rouge_score(
        self, hypothesis_text: str, reference_text: str
    ) -> npt.ArrayLike:
        rouge = Rouge()
        scores = rouge.get_scores(hypothesis_text, reference_text)
        return scores

### 3. Extract summary

In [None]:
summary_results = []
articles = movie_reviews.loc[:, "cleaned_reviews"].sample(3).values

for review in articles:
    print(f"Original Review: \n{review}")
    print("-" * 200)
    extractive_summarizer = ExtractiveTextSummarizer(article=review)
    review_summary = extractive_summarizer.run_article_summary()
    summary_results.append(review_summary)

    print(f"Summarised Review: \n{review_summary}")
    print("-" * 200)

    # this line is POC for now since we don't have the reference text
    print(
        extractive_summarizer.get_rouge_score(
            hypothesis_text=review_summary, reference_text=review_summary
        )
    )