# Restaurant Reviews NLP implementation
This is my first project on DataBricks AWS using PySpark. This is my first NLP problem in ML.

## Objective
Categorize customer sentiment using the provided review. The provided dataset has

# Familiarizing
There are several practices in NLP that need just a bit of exploration. In order:
* Tokenization
  * Splitting of sentences into individual words for further processing, seems to be one of the first steps in NLP data preparation.
* Stop-Word Removal
  * Removal of unnecessary words which do not provide any contextual meaning to the sentences. Words like 'the', 'of', 'to', etc.
* N-Grams
  * Grouping of individual words into n-long 'sentences'. This can be useful for identifying particular n-grams which can provide important information. Things such as 'tasted bad', 'great price', etc.
* Term Frequency
  * Checking the frequency of particular terms, which may or may not have correlation with outputs. Inverse frequency is also commonly used under the following principle: Words that appear exceedingly often most likely have less unique actionable information than less-frequent words. For exmaple, comparing 'stellar' with 'okay'.

In [None]:
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, StringType, StructField, StructType, BooleanType, ArrayType, TimestampType

In [None]:
df = sqlContext.sql("Select * FROM nlp_restaurant_reviews")
df.show(5)

In [None]:
df.printSchema()

In [None]:
df.count()

## Tokenization
Splitting of sentences into individual words for processing

In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import udf

In [None]:
tokenizer = Tokenizer(inputCol='Review', outputCol='words') # Splits sentences into whole words

regexTokenizer = RegexTokenizer(inputCol='Review', outputCol='words', pattern='\\W') # Splits sentences into whole words, ignores punctuation

countTokens = udf(lambda p: len(p), IntegerType()) # udf applies the lambda function to each row when called on a column

In [None]:
tokenized = tokenizer.transform(df)
tokenized.withColumn('tokens', countTokens(col('words'))).show()

## Stop Word Removal

Removal of common words which add little to no meaning to the sentence

In [None]:
from pyspark.ml.feature import StopWordsRemover

In [None]:
df.show(5, truncate=False)

In [None]:
remover = StopWordsRemover(inputCol='words', outputCol='cleaned')
remover.transform(tokenized).select('cleaned').show(5, truncate=False)

## n-grams

In [None]:
from pyspark.ml.feature import NGram

In [None]:
bigram = NGram(n=2, inputCol='words', outputCol='bigrams')
bigram_df = bigram.transform(tokenized).select('bigrams').show(5, truncate=False)

## Term Freq; Inverse Doc Freq (TF-IDF)
Importance of specific words. Generally, the more a word is used, the less unique meaning it carries within a corpus. Thus, inverse frequency is used.

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [None]:
tokenizer = RegexTokenizer(inputCol='Review', outputCol='words', pattern='\\W')
words = tokenizer.transform(df)
words.show(5, truncate=False)

In [None]:
hashingTF = HashingTF(inputCol='words', outputCol='rawFeatures', numFeatures=20)
featurized = hashingTF.transform(words)
featurized.show(5)

In [None]:
idf = IDF(inputCol='rawFeatures', outputCol='features')
idf_model = idf.fit(featurized)
rescale = idf_model.transform(featurized)
rescale.select('Liked', 'features').show(5)

# Prototype Model
From what I understand, NLP follows the following 'simple' procedure:
1) Process text data
2) Make features numeric (i.e. vectorization)
3) Train using existing ML models (such as random forest classifier)

In my research the 'sparknlp' library provides access to more refined tools for NLP tasks, and so now I will be working with that. My original dataframe from above is still intact, I will be starting there.

In [None]:
import sparknlp
from sparknlp.base import * # Change once all used modules are known
from sparknlp.annotator import * # Change once all used modules are known
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, DecisionTreeClassifier, LinearSVC, LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

In [None]:
spark = sparknlp.start()

## Annotation

The text needs to be formatted and cleaned as much as possible prior to the vectorization of the data. The steps I wish to implement, to some degree, are as follows. For all of these points, I will modify my approach as new information becomes available to me.
1) **Tokenization**
    * Splitting of reviews into individual words. This may or may not involve multiple steps, a custom RegEx, or more.
2) **Spell-checking**
    * All words should be spell-checked prior to any future steps for obvious reasons.
3) **Stop-words**
    * Removal of unnecessary words that do not contribute significant meaning to the sentence. Removal of these words places more emphasis on the words that matter during training.
4) **Lemmatization**
    * Words should be reduced to their lemma forms in order to make the text more readable to the computer (i.e. Doing, Does, Did -> Do). In theory, this should strengthen the patterns between certain words and their corresponding meaning as interpreted by the algorithm.

In [None]:
document = DocumentAssembler()\
                .setInputCol('Review')\
                .setOutputCol('document')\

sentence = SentenceDetector()\
                .setInputCols('document')\
                .setOutputCol('sentence')

tokenizer = Tokenizer()\
                .setInputCols('sentence')\
                .setOutputCol('token')

checker = NorvigSweetingModel()\
                .pretrained()\
                .setInputCols('token')\
                .setOutputCol('checked')

stopwords = StopWordsCleaner()\
                .pretrained('stopwords_en', 'en')\
                .setInputCols('checked')\
                .setOutputCol('cleaned')

lemmatizer = LemmatizerModel()\
                .pretrained()\
                .setInputCols('cleaned')\
                .setOutputCol('lemma')

In [None]:
pipeline = Pipeline().setStages([document, sentence, tokenizer, checker, stopwords, lemmatizer])
model = pipeline.fit(df)
result = model.transform(df)

In [None]:
result.select('Review', 'lemma.result').show(truncate=False)

There is an issue with the first line spell checking 'Loved' to 'moved' and eventually lemmatized to 'move'. Perhaps a different tokenizer without punctuation would help. Important negators like 'not' are being flagged by the stopwords filter, and this is a big problem. A review like 'Crust is not good' being turned into 'Crust good' is clearly a flipping of the sentiment of the review. I will need to look into this.

I want to create a custom RegEx that will exclude almost all punctuation except for apostrophes and hyphens. As I have zero experience with RegEx, I found a website that allows me to test different RegEx expressions and see how they affect sample text. Using this, I was able to create the RegEx expression r"[a-zA-Z0-9\'\-]" which should include everything I need, and exclude the rest. Since this is being applied after a sentencer, I do not need to worry about single periods.

In [None]:
tokenizer = RegexTokenizer()\
                .setInputCols('sentence')\
                .setOutputCol('token')\
                .setPattern(r'[^a-zA-Z0-9_\']')

checker = ContextSpellCheckerModel\
                .pretrained()\
                .setInputCols("token")\
                .setOutputCol("checked")

lemmatizer = LemmatizerModel()\
                .pretrained()\
                .setInputCols('checked')\
                .setOutputCol('lemma')

stopwords = StopWordsCleaner()\
                .pretrained('stopwords_iso', 'en')\
                .setInputCols('lemma')\
                .setOutputCol('cleaned')

finisher = Finisher()\
                .setInputCols('lemma')\
                .setOutputCols('result')

In [None]:
pipeline = Pipeline().setStages([document, sentence, tokenizer, checker, lemmatizer, finisher])
model = pipeline.fit(df)
result = model.transform(df)

In [None]:
result.show(truncate=False)

A new spell checker, as well as a custom RegEx fixed my issues so far. There are still some errors in the first few rows (i.e. ravoli -> revolt; overpriced -> override), but these are hard to fix and I have to accept some level of error, especially on my first ever NLP project. I still want to explore using a stopword filter, but I'm not quite sure at this point how to ensure I keep contextual descriptors like 'not'. I've tried both pretrained stop-word models available in the spark-nlp library. These are build on the pyspark.ml.features.StopWordsRemover, so that wouldn't work either. My last, very rudimentary option, is to do a custom stop-word list.

I'm simply going to look through the first 25 rows and pick out any words I don't believe are necessary. Later on I may use a count vectorizer to find the terms with the most frequency, however this will take a long computation time and for now, I just want to get a prototype working.

## Vectorization

The text now needs to be given a numeric format for usage by the ML classifier. To my knowledge, there are two paradigms in which this can be done; text-frequency based, and pretrained-embeddings based. For this first project, I will be using text-frequency based vectorization, and I will be using a count vectorizer followed by an IDF.

In [None]:
stop_words = ['the', 'be', 'now', 'get', 'on', 'and', 'so', 'want', 'I', 'it', 'that', 'honesty', 'you', 'they', 'them', 'this', 'by', 'during', 'holiday', 'off', 'back', 'what',
             'say', 'to', 'still', 'end', 'up', 'way', 'little', 'in', 'let', 'alone', 'at', 'all', 'because', 'oh', 'stuff', 'red', 'that\'s', 'a', 'of', 'some']

remover = StopWordsRemover(stopWords = stop_words)\
            .setInputCol('finished')\
            .setOutputCol('cleaned')

finisher = Finisher()\
                .setInputCols('lemma')\
                .setOutputCols('finished')

countVec = CountVectorizer()\
            .setInputCol('finished')\
            .setOutputCol('rawFeatures')

idf = IDF()\
            .setInputCol('rawFeatures')\
            .setOutputCol('features')

pipeline = Pipeline().setStages([document, sentence, tokenizer, checker, lemmatizer, finisher, remover, countVec, idf])
model = pipeline.fit(df)
result = model.transform(df)

result.show()

On second thought, once the model is trained, it would in-fact be useful to have a CountVectorization map rather than a hashmap. The ability to distinguish which buzz-words are associated with positive and negative reviews is quite useful, and it is cumbersome to do this with a hashed count. It only takes around 4-5 minutes to run on this small dataset, so it's not bad at all, actually. I really want to get some predictions going.

## Model training and evaluation

Finally I can train a model on this very rough pipeline. I simply want to get a prototype working so that I can understand all I have until now. Afterwards, I will set up a proper full-length pipeline with several models to try and hyperparameter tuning. For now, I will train on a LogisticRegression classifier (weird name)

In [None]:
model_data = result.withColumn('label', col('Liked')).select(['features', 'label'])
model_data.show(5)

train, test = model_data.randomSplit([0.9, 0.1], seed=31415)
train.show()

In [None]:
lr = LogisticRegression()
lrModel = lr.fit(train)

In [None]:
lr_pred = lrModel.evaluate(test)

In [None]:
lr_pred.accuracy

# Putting it all together

Now I can start looking at training entire pipelined models, applying grid searches on parameters, and testing different models. I will now describe a high-level summary of the tasks to be completed:

1) Pipeline for initial data processing, including the following steps:
    * Document, Sentencing -- Standard
    * Tokenization -- Standard
    * Spell checking -- ContextSpellChecker provided the best results in the exploration phase above
    * Lemmatization -- Standard
    * Stop-word removal -- Need to test several approaches:
        * No stop-words
        * Custom list as used above
        * Pretrained model
    * Finisher (Standard)
    * HashingTF (For speed vs CountVectorizer)
    * IDF Layer (Standard)
    
2) Model pipelines constructed where previous pipeline ends:
    * One for each model to be tested
    * Use cross validation and parameter grids to identify strongest candidates
    
The model pipelines will be separate from the data processing pipeline due to the fact that the data processing pipeline will be identical regardless of the model used. This will cut down on compute time at the cost of memory. Currently, memory is not an issue.

In [None]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import RegexTokenizer, HashingTF, IDF, StopWordsRemover
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel, TrainValidationSplit
from pyspark.sql.functions import col

import sparknlp
from sparknlp.annotator import * # Change once all used modules are known
from sparknlp.base import * # Change once all used modules are known

In [None]:
# ---=== Base pipeline with pretrained stopwords ===---

document = DocumentAssembler()\
                .setInputCol('Review')\
                .setOutputCol('document')\

sentence = SentenceDetector()\
                .setInputCols('document')\
                .setOutputCol('sentence')

tokenizer = RegexTokenizer()\
                .setInputCols('sentence')\
                .setOutputCol('token')\
                .setPattern(r'[^a-zA-Z0-9_\']')

checker = ContextSpellCheckerModel\
                .pretrained()\
                .setInputCols("token")\
                .setOutputCol("checked")

lemmatizer = LemmatizerModel()\
                .pretrained()\
                .setInputCols('checked')\
                .setOutputCol('lemma')

stopwords = StopWordsCleaner()\
                .pretrained('stopwords_iso', 'en')\
                .setInputCols('lemma')\
                .setOutputCol('cleaned')

finisher = Finisher()\
                .setInputCols('cleaned')\
                .setOutputCols('finished')

hashingTF = HashingTF()\
                .setInputCol('finished')\
                .setOutputCol('rawFeatures')

idf = IDF()\
                .setInputCol('rawFeatures')\
                .setOutputCol('features')

pipe_sw_pre = Pipeline()\
                .setStages([document, sentence, tokenizer, checker, 
                            lemmatizer, stopwords, finisher, hashingTF, idf])

# ---=== Custom stop words in the pipeline ===---

stop_words = ['the', 'be', 'now', 'get', 'on', 'and', 'so', 'want', 'I', 'it', 'that', 'honesty', 'you', 'they', 'them', 'this', 'by', 'during', 'holiday', 'off', 'back', 'what',
             'say', 'to', 'still', 'end', 'up', 'way', 'little', 'in', 'let', 'alone', 'at', 'all', 'because', 'oh', 'stuff', 'red', 'that\'s', 'a', 'of', 'some']

finisher.setInputCols('lemma')\
        .setOutputCols('finished')

stopwords = StopWordsRemover(stopWords = stop_words)\
                .setInputCol('finished')\
                .setOutputCol('cleaned')

hashingTF.setInputCol('cleaned')\
         .setOutputCol('rawFeatures')
                

pipe_sw_cstm = Pipeline()\
                .setStages([document, sentence, tokenizer, checker, 
                            lemmatizer, finisher, stopwords, hashingTF, idf])

# ---=== No stop words in the pipeline ===---

finisher.setInputCols('lemma')\
        .setOutputCols('finished')

hashingTF.setInputCol('finished')\
         .setOutputCol('rawFeatures')

pipe_sw_none = Pipeline()\
                .setStages([document, sentence, tokenizer, checker, 
                            lemmatizer, finisher, hashingTF, idf])

I wish to use a classification model that allows for explainability of the data. This is a supervised classification problem, and so the possible models I am aware of that meet these criteria are Decision Trees and Logistic Regression. I will start with random-ish values for the parameters and adjust as results come in.

In [None]:
lr = LogisticRegression()
lr_params = ParamGridBuilder()\
            .addGrid(lr.regParam, [0.1, 0.01])\
            .addGrid(lr.maxIter, [1, 5, 10, 25])\
            .build()

dt = DecisionTreeClassifier()
dt_params = ParamGridBuilder()\
            .addGrid(dt.maxBins, [10,50,100])\
            .addGrid(dt.maxDepth, [10, 50, 100])\
            .build()

In [None]:
df = sqlContext.sql("Select * FROM nlp_restaurant_reviews")
df = df.withColumn('label', col('Liked')).select(['label', 'Review'])
train, test = df.randomSplit([0.8, 0.2], seed = 31415)

model_info = {
    'lr' : lr,
    'dt' : dt
}

param_info = {
    'lr' : lr_params,
    'dt' : dt_params
}

pipe_info = {
    'sw_pre' : pipe_sw_pre,
    'sw_cstm' : pipe_sw_cstm,
    'sw_none' : pipe_sw_none
}



# , pipe_sw_custom, pipe_sw_none
# , 'dt'

# I will be constructing a JSON-like list containing the results of the calculations.
        
"""
cv = TrainValidationSplit(estimator = pipe_added,
                   estimatorParamMaps = lr_params,
                   evaluator = BinaryClassificationEvaluator(),
                   seed = 31415,
                   parallelism=1
)

cvModel = cv.fit(train)
pred = cvModel.transform(test)
"""

I wanted to use the CrossValidator object to test different parameter distributions as well as the separate models, however, I was running into an error with no documentation online. I've posted a StackOverflow request but to no avail. For now, I will forego the hyperparameter tuning by use of CrossValidator (TrainValidationSplit raised same error) and simply test the different pipelines with default parameters.

In [None]:
results_info = {}
evaluator = BinaryClassificationEvaluator()
df = sqlContext.sql("Select * FROM nlp_restaurant_reviews")
df = df.withColumn('label', col('Liked')).select(['label', 'Review'])

for pipe_label, pipe in pipe_info.items():
    model_results = {}

    for model_label, model in model_info.items():
        train, test = df.randomSplit([0.8, 0.2], seed = 31415)
        
        pipe_added = Pipeline().setStages([pipe, model])
        pipe_trained = pipe_added.fit(train)
        
        pipe_data = pipe_trained.transform(test)
        areaUnderROC = evaluator.evaluate(pipe_data)
        
        print(f'{pipe_label} - {model_label}: {areaUnderROC}')
        model_results[model_label] = areaUnderROC
        
        del pipe_data
        
    results_info[pipe_label] = model_results

There is clearly something else going on here, it is extremely unlikely that all three pipelines perform exactly the same. Either my pipelines are being instanced to the same memory locations, or there is a leak in my loop. Regardless, it would seem that the LogisticRegression classification works significantly better than the Decision Tree. Since this is a binary classification, if the accuracy of the Decision Tree is <0.5 I can simply take the complement to achieve a true prediction accuracy >0.5. The above metric however is the area under the ROC curve, and so I should look at the accuracy specifically if I want to apply that reasoning.