# Big Data Systems Architecture - Spark Assignment 

In [228]:
#import of libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler

In [229]:
#We create a spark session in order to create an application to exlore the data
spark = SparkSession.builder.appName("Linear_Regression").getOrCreate()

In [252]:
#We load the dataset in a dataframe. We read the file with the .json() function
books = spark.read.json("books_5000.json")
type(books)

pyspark.sql.dataframe.DataFrame

In [269]:
#we fill the empty string values of the language_code and publication_year values with 'None'.
books = books.withColumn('language_code', when(col('language_code') == '', 'None').otherwise(col('language_code')))
books = books.withColumn('publication_year', when(col('publication_year') == '', 'None').otherwise(col('publication_year')))

#We convert the data types of the num_pages, ratings_count and average_rating variables from string to numeric (integer and float)
books = books.withColumn("num_pages",col("num_pages").cast("integer"))
books = books.withColumn("ratings_count",col("ratings_count").cast("integer"))
books = books.withColumn("average_rating",col("average_rating").cast("float"))

In [270]:
#we fill the null (or na) values of the numeric variables with 0
books = books.na.fill(0,("num_pages", "ratings_count", "average_rating"))

In [271]:
#We split the dataset in train and test
trainDF, testDF = books.randomSplit([0.7, 0.3], seed=15)

print(trainDF.cache().count()) # Cache because accessing training data multiple times

print(testDF.count())

3505
1494


In [272]:
# We determine which of the columns are categorical.
categorical_cols = ['language_code', 'publication_year']

In [291]:
# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categorical_cols, outputCols=[x + "Index" for x in categorical_cols]).setHandleInvalid('keep')
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categorical_cols]) 

In [292]:
stringIndexerModel = stringIndexer.fit(books)
stringIndexerModel.transform(books).show(5)

+----------+--------------------+--------------+--------+------------+--------------------+-------------------+---------+--------------------+--------+----------+-------------+-----------+-------------+--------------------+---------+--------------------+---------------+-----------------+----------------+--------------------+-------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------+------------------+---------------------+
|      asin|             authors|average_rating| book_id|country_code|         description|edition_information|   format|           image_url|is_ebook|      isbn|       isbn13|kindle_asin|language_code|                link|num_pages|     popular_shelves|publication_day|publication_month|publication_year|           publisher|ratings_count|              series|       similar_books|text_reviews_count|               title|title_without_series|                 url| work_id|language_cod

In [293]:
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["num_pages","ratings_count"]
assemblerInputs = [c + "OHE" for c in categorical_cols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [294]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression

#we fit a linear regression model
LinReg = LinearRegression(featuresCol="features", labelCol ='average_rating')

In [295]:
from pyspark.ml import Pipeline

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, vecAssembler, LinReg])

# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Apply the pipeline model to the test dataset to classify the respective samples.
predDF = pipelineModel.transform(testDF)

In [303]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="r2")
evaluator.setLabelCol("average_rating")
print(f"R-Squared value of the Linear Regression Model is: {evaluator.evaluate(predDF)}")

R-Squared value of the Linear Regression Model is: 0.03716860928892651
