# Spark Assignment Task 3

## Big Data Systems

---

> Marianna Konstantopoulou <br />
> MSc Business Analytics Part Time <br />

As a final task, your supervisor assigned to you to investigate if it is possible to train a linear regression model (using LinearRegression() function) that could predict the “average_rating” of a book, using as input, its “language_code”, its “num_pages”, its “ratings_count”, and its “publication year”. Again you should use Python and Dataframes, this time with MLlib. You should pay attention to transform the string-based input features (“language_code” and “publication_year”) using the proper representation format, and you should explain your choices. Your code should (a) prepare the feature vectors, (b) prepare the training and testing datasets (70%-30%), (c) train the model, and (d) evaluate the accuracy of the model (based on the Rsquared metric) and display the corresponding metric on the screen.

## 1. Load the dataset and convert data types

In [151]:
#First we load the dataset
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("bookstore").getOrCreate()

dataset = spark.read.json("books_5000.json")
dataset = dataset.select('average_rating', 'language_code','num_pages', 'ratings_count','publication_year')
from pyspark.sql.functions import col,when
dataset = dataset.withColumn("language_code", when(col("language_code")=="" ,'Unknown').otherwise(col("language_code")))
dataset = dataset.withColumn("num_pages", when(col("num_pages")=="", None).otherwise(col("num_pages")))
dataset = dataset.withColumn("publication_year", when(col("publication_year")=="" ,'Unknown').otherwise(col("publication_year")))
dataset.show()

+--------------+-------------+---------+-------------+----------------+
|average_rating|language_code|num_pages|ratings_count|publication_year|
+--------------+-------------+---------+-------------+----------------+
|          4.12|      Unknown|     null|            1|         Unknown|
|          3.94|          fre|     null|           16|            2016|
|          4.28|          eng|      146|           51|            2012|
|          4.05|          eng|     null|            6|         Unknown|
|          4.06|        en-US|      272|           51|            1997|
|          3.44|      Unknown|      206|           46|            2007|
|          4.15|          eng|      224|           39|            2016|
|          3.16|      Unknown|      160|           38|            2016|
|          3.51|      Unknown|      160|           44|            2016|
|          4.00|      Unknown|      144|           32|            2016|
|          4.41|          kor|      212|          133|          

* As first step we amend the types of some columns that are in interest for us.

In [152]:
dataset.printSchema()

root
 |-- average_rating: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- num_pages: string (nullable = true)
 |-- ratings_count: string (nullable = true)
 |-- publication_year: string (nullable = true)



In [153]:
from pyspark.sql.types import IntegerType, DoubleType
dataset = dataset.withColumn("num_pages", dataset["num_pages"].cast(IntegerType()))
dataset = dataset.withColumn("average_rating", dataset["average_rating"].cast(DoubleType()))
dataset = dataset.withColumn("ratings_count", dataset["ratings_count"].cast(IntegerType()))
dataset.dtypes

[('average_rating', 'double'),
 ('language_code', 'string'),
 ('num_pages', 'int'),
 ('ratings_count', 'int'),
 ('publication_year', 'string')]

* Then we will delete rows with NA in column `num_pages`.

In [154]:
dataset = dataset.na.drop()
dataset.show()

+--------------+-------------+---------+-------------+----------------+
|average_rating|language_code|num_pages|ratings_count|publication_year|
+--------------+-------------+---------+-------------+----------------+
|          4.28|          eng|      146|           51|            2012|
|          4.06|        en-US|      272|           51|            1997|
|          3.44|      Unknown|      206|           46|            2007|
|          4.15|          eng|      224|           39|            2016|
|          3.16|      Unknown|      160|           38|            2016|
|          3.51|      Unknown|      160|           44|            2016|
|           4.0|      Unknown|      144|           32|            2016|
|          4.41|          kor|      212|          133|            2014|
|          3.16|          eng|      144|          114|            2011|
|          4.41|          eng|      200|          149|            2012|
|          4.39|      Unknown|      230|          152|          

## 2. Prepare data for training & testing

In [179]:
trainDF, testDF = dataset.randomSplit([0.7, 0.3], seed=42) #The Hitchhiker’s Guide to the Galaxy refference

print(trainDF.cache().count()) # Cache because accessing training data multiple times

print(testDF.count())

2537
1080


We decide to to a 70/30 split for train and test and we use `seed` so that for each run we have exactly the same split (for reproducibility). 
Finally, we use the cache() function to keep `trainDF` in memory for efficiency reasons, since the training dataset will be used multiple times during the training phase.

In [180]:
trainDF.show(5)

+--------------+-------------+---------+-------------+----------------+
|average_rating|language_code|num_pages|ratings_count|publication_year|
+--------------+-------------+---------+-------------+----------------+
|           2.0|      Unknown|      144|            1|            2017|
|           2.0|          ind|      128|            1|            2010|
|          2.29|      Unknown|      136|           82|            2016|
|          2.33|      Unknown|      368|            3|            2002|
|          2.35|      Unknown|      104|           43|            2008|
+--------------+-------------+---------+-------------+----------------+
only showing top 5 rows



## 3. Feature preprocessing

Linear regression, requires numeric features. Our dataset includes categorical features such as `publication_year` and `language_code`. If we want to consider them, we need to manipulate or reprocess them.

We will use **one hot encoding**, that converts categorical variables into a set of numeric variables that only take on values 0 and 1. We will first use the `StringIndexer`, followed by the `OneHotEncoder` **estimator**.

The following code block defines the `StringIndexer` and `OneHotEncoder`.

In [181]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# We determine which of the columns are categorical.
categoricalCols = ["language_code", "publication_year"]

# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols]).setHandleInvalid("keep")
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols]) 

Following that, we'll create a *pipeline* that combines all of our feature engineering and modeling phases to complete our task. But first, we will take a look at how estimators and transformers function by using the `StringIndexer` estimator from the preceding code block. We can use the `.fit()` function to get a `StringIndexerModel` which we can then utilize to alter the dataset.
The `.transform()` method of `StringIndexerModel` returns a new DataFrame with the new columns appended.

In [182]:
stringIndexerModel = stringIndexer.fit(trainDF)
stringIndexerModel.transform(trainDF).show(5)

+--------------+-------------+---------+-------------+----------------+------------------+---------------------+
|average_rating|language_code|num_pages|ratings_count|publication_year|language_codeIndex|publication_yearIndex|
+--------------+-------------+---------+-------------+----------------+------------------+---------------------+
|           2.0|      Unknown|      144|            1|            2017|               1.0|                  9.0|
|           2.0|          ind|      128|            1|            2010|               3.0|                  6.0|
|          2.29|      Unknown|      136|           82|            2016|               1.0|                  1.0|
|          2.33|      Unknown|      368|            3|            2002|               1.0|                 16.0|
|          2.35|      Unknown|      104|           43|            2008|               1.0|                  8.0|
+--------------+-------------+---------+-------------+----------------+------------------+------

In [183]:
from pyspark.ml.feature import VectorAssembler

# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["num_pages", "ratings_count"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features", handleInvalid='keep')

## 4. Define the model

In [184]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="average_rating", regParam=1.0)

## 5. Build the pipeline

In [185]:
from pyspark.ml import Pipeline

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, vecAssembler, lr])

# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Apply the pipeline model to the test dataset to predict the average rating of a book.
predDF = pipelineModel.transform(testDF)

In [186]:
predDF.select("features", "average_rating", "prediction").show(5)

+--------------------+--------------+------------------+
|            features|average_rating|        prediction|
+--------------------+--------------+------------------+
|(92,[1,39,90,91],...|          2.57|3.8868249903039183|
|(92,[1,53,90,91],...|          2.67|3.8764447146221372|
|(92,[7,42,90,91],...|          2.71|  3.88069845863319|
|(92,[0,41,90,91],...|          2.72| 3.904210164925219|
|(92,[1,70,90,91],...|          2.81|3.8340308856420102|
+--------------------+--------------+------------------+
only showing top 5 rows



* We will evaluate the accuracy of the model (based on the Rsquared metric).

In [187]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="average_rating",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predDF))

R Squared (R2) on test data = 0.0512271
