# Part 2) Datasets/DataFrames: Spark ML and Pipelines

Convert the review texts to a classic vector space representation with TFIDF-weighted features based on the Spark DataFrame/Dataset API by building a transformation pipeline. The primary goal of this part is the preparation of the pipeline for Part 3 (see Part3.ipynb file).

By the help of built-in functions, do the tokenization to unigrams with whitespaces, tabs, digits, and common delimiter characters, casefolding, stopword removal, TF-IDF calculation, and chi square selection (using 4.000 top terms). Write the terms selected this way to a file output_ds.txt.

*Commands used in terminal in order to execute this notebook:*
- **jupyter nbconvert Part2.ipynb --to script** *(for converting jupyter notebook file to python file in order to execute it via spark-submit)*
- **spark-submit --executor-memory 8G --num-executors 4 --total-executor-cores 16 --conf spark.ui.port=5051 Part2.py**

For this part, we will first set the configurations and initialize the session and context which we will be using.

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Part 2) Datasets/DataFrames: Spark ML and Pipelines")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

### Read the data

We will read the data as a json file by using `spark.read.json` and select *category* and *reviewText* for further analyses. We will also read *stopwords* as text file by using `sc.textFile` and store them as a list of words.

An array named *stages* will be initialize to collect all the stages for pipeline

In [None]:
# Read the data as json file and select category and reviewText
data = spark.read.json("/data/reviews_devset.json").select('category', 'reviewText')
# Read the stopwords as a list
stopwords = sc.textFile("/data/stopwords").collect()
# Initialize stages as an array to collect all the stages for pipeline
stages = []

### Building stages for pipeline creation

We will use *RegexTokenizer* to apply the pattern of delimiters to the reviewText. Then, we will add the `regexTokenizer` to the stages array. The output column of the *RegexTokenizer* is *words*.

In [None]:
from pyspark.ml.feature import RegexTokenizer

regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\d+|\\t|\\.|\?|!|,|;|\\:|\(|\)|\[|\]|\{|\}|-|\"|`|~|#|&|\*|%|\$|\\\\|/|\\s+").setMinTokenLength(2)

stages += [regexTokenizer]

We will use *StopWordsRemover* in the `regexTokenizer` output column to filter the words that are is stopwords array. Then, we will add the `remover` to the stages array. The output column of the *StopWordsRemover* is *filtered_words*

In [None]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol=regexTokenizer.getOutputCol(), outputCol="filtered_words", stopWords=stopwords, caseSensitive=False)

stages += [remover]

We will use *CountVectorizer* in remover's output column to calculate `tf - term frequecy`*(the number of times that term f appears in document d, while document frequency DF is the number of documents that contains term t)*. Then, we will use the output colum of the *CountVectorizer* as an input column of *IDF* to calculate `tfidf = tf * idf`*(where idf is the inverse document frequency)* that we will need for the chi-square calculation. The output column of the *CountVectorizer* is *tf* and of the *IDF* is *tfidf*.

We will add `cv` and `idf` to the stages array

In [None]:
from pyspark.ml.feature import CountVectorizer, IDF

cv = CountVectorizer(inputCol=remover.getOutputCol(), outputCol="tf")
idf = IDF(inputCol=cv.getOutputCol(), outputCol="tfidf")

stages += [cv, idf]

In order to use `category` column as a label column in the *ChiSqSelector*, it should be indexed first. We will use *StringIndexer* for that then we will add the `indexer` to the stages array. The output column of the *StringIndexer* is *label*.

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="category", outputCol="label")

stages += [indexer]

The *ChiSqSelector* will be used to calculate chi-square of the features selected by *tfidf*. We will select top 4000 features and output the results in the *selectedFeatures* column. Then, we will add the `selector` to the stages array.

In [None]:
from pyspark.ml.feature import ChiSqSelector

selector = ChiSqSelector(numTopFeatures=4000, featuresCol="tfidf", outputCol="selectedFeatures", labelCol="label")

stages += [selector]

### Setting up pipeline and fitting the model with it

After creating all the stages, we can create now the Pipeline model and fit it

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)

In [None]:
model = pipeline.fit(data)

We save the model we fitted by using `model.write().overwrite().save(path)`

In [None]:
model.write().overwrite().save("/Solution/pipeline")

### Extracting top 4000 words from the model and saving them into a file

We will take the vocabulary from the pipeline's *CountVectorizerModel* and store it in an array called `cvModel`

In [None]:
from pyspark.ml.feature import CountVectorizerModel

# get CountVectorizerModel from model stages
vectorizers = [s for s in model.stages if isinstance(s, CountVectorizerModel)]
# get vocabulary from vectorizers
cvModel = [v.vocabulary for v in vectorizers]

We will transform the model we created in order to get chi-square calculations

In [None]:
chiSqModel = model.transform(data)

In order to get the terms, we first need to get all the indices from *selectedFeatures* column and store them in a different column. After that, we will iterate for each indice and find the word in the vocabulary that corresponds to it.

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *

# getting the list of indices of the selectedFeatures column
feature_extract_keys = udf(lambda vector: vector.indices.tolist(), ArrayType(IntegerType()))

# getting the list of words that correspond to the indices we extracted in the chiSquareKeys column
def indices_to_terms(vocabulary):
    def indices_to_terms(xs):
        return [vocabulary[int(x)] for x in xs]
    return udf(indices_to_terms, ArrayType(StringType()))

chiSqModel = chiSqModel.withColumn("chiSquareKeys", feature_extract_keys(col("selectedFeatures")))
chiSqModel = chiSqModel.withColumn("terms", indices_to_terms(list(list(cvModel)[0]))("chiSquareKeys"))

We will select the *terms* column we created and save it in an array. Then, we will concatenate all the words in that array, by space and also, by sorting them ascending, and save them in a string variable called `wordsToFile`

In [None]:
from pyspark.sql.functions import collect_list
import numpy as np

# selecting terms column as a list from chiSqModel
termsList = chiSqModel.select(collect_list('terms')).first()[0]
# joining the list of words in termsList by space-separating them
wordsToFile = " ".join(sorted(set(np.concatenate(termsList).ravel())))

We will convert `wordsToFile` variable into a spark dataframe in order to save it into a text file called *output_ds.txt*. 

We will use `.coalesce(1).write.format("text").mode('overwrite').save(path)` in order to overwrite the file if exists.

In [None]:
# convert wordsToFile string to a spark dataframe
one_line_words = spark.createDataFrame([wordsToFile], StringType())
# saving one_line_words dataframe to a file
one_line_words.coalesce(1).write.format("text").mode('overwrite').save("/Solution/output_ds.txt")