# Assignment 2 - Apache Spark

In [0]:
#importing packages
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext

In [0]:
#instantiate the spark session
spark = SparkSession.builder.appName("Assignment").getOrCreate()

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", 4)

## Part A: Question 1

In [0]:
#reading in text file
sourceRDD_A1 = spark.sparkContext.textFile("/FileStore/tables/integer.txt", 4)


In [0]:
#counting total number of records in RDD
sourceRDD_A1.count()

In [0]:
#Transformation
#converting x to floats
float_RDD = sourceRDD_A1.map(lambda x: float(x))

In [0]:
#Transformation
#operations to filter for even and odd numbers
even_RDD = float_RDD.filter(lambda x: x%2 == 0)
odd_RDD = float_RDD.filter(lambda x: x%2 != 0)

In [0]:
#Action
#Number of even numbers 
even_RDD.count()

In [0]:
#Action
#Number of odd numbers
odd_RDD.count()

## Part A: Question 2

In [0]:
#reading in text file
sourceRDD_A2 = spark.sparkContext.textFile("/FileStore/tables/salary.txt", 4)
#viewing source rdd
sourceRDD_A2.collect()

In [0]:
#creating a list of arrays for each row
arrayRDD_A2 = sourceRDD_A2.map(lambda x: x.split(" "))
arrayRDD_A2.collect()

In [0]:
#converting the 2nd element (position 1) into float
kvRDD_A2 = arrayRDD_A2.map(lambda x: (x[0], float(x[1])))
kvRDD_A2.collect()

In [0]:
#calculating the salary sum for each department
sumRDD_A2 = kvRDD_A2.reduceByKey(lambda x,y: x+y)
#Salary sum for each department
sumRDD_A2.collect()

## Part A: Question 3

In [0]:
import pyspark.sql.functions as func
import re

In [0]:
sourceRDD_A3 = spark.sparkContext.textFile("/FileStore/tables/shakespeare.txt", 4)

In [0]:
#turning each line into a list of words
line_RDD_A3 = sourceRDD_A3.flatMap(lambda line: line.split(" "))
line_RDD_A3.collect()

In [0]:
#cleaning the list so that all the puncuation is removed
clean_RDD_A3 = line_RDD_A3.map(lambda x: re.sub('[^A-Za-z\s\d]',  '',  x))
clean_RDD_A3.collect()

In [0]:
#removing the element " " from the list
input_RDD_A3 = clean_RDD_A3.filter(lambda x: len(x) > 0)
input_RDD_A3.collect()

In [0]:
#MapReduce -> Mapping step 
mapper_RDD_A3 = input_RDD_A3.map(lambda word: (word, 1))
mapper_RDD_A3.collect()

In [0]:
#MapReduce -> Reducer step 
reducer_RDD_A3 = mapper_RDD_A3.reduceByKey(lambda x, y: x + y)
reducer_RDD_A3.collect()

In [0]:
#convert to dataframe and cache 
wordcounts = spark.createDataFrame(reducer_RDD_A3).cache()
#changing column headers of wordcount dataframe
wordcounts = wordcounts.select(col("_1").alias("word"), col("_2").alias("count"))
wordcounts.show()

In [0]:
#filtering the dataframe for the EXACT words in word_list
word_list = ['Shakespeare', 'When', 'Lord', 'Library', 'GUTENBERG', 'WILLIAM', 'COLLEGE', 'WORLD']
filtered_words = wordcounts.filter(func.col('word').rlike('(^|\s)(' + '|'.join(word_list) + ')(\s|$)'))
#result
filtered_words.show()

## Part A: Question 4

In [0]:
#Top 15 words with the least count
wordcounts.orderBy('count', ascending=True).show(15)

In [0]:
#Top 15 words with the most count
wordcounts.orderBy('count', ascending=False).show(15)

## Part B: Question 1

In [0]:
# File location and type
path = "/FileStore/tables/movies.csv"

df = spark.read \
  .format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ',') \
  .option("path", path) \
  .load()

In [0]:
df.show()

In [0]:
#checking for any null values
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [0]:
#descriptive statistics of dataframe
df.describe().show()

In [0]:
#finding the number of users in data
df.select("userId").distinct().count()

In [0]:
#finding the number of movies reviewed 
df.select("movieId").distinct().count()

In [0]:
#calculating the number of times users reviewed movies
userrating_counts = df.groupBy("userId").count()
display(userrating_counts)

userId,count
12,55
13,48
14,57
18,52
25,46
6,57
9,53
15,48
16,45
17,46


In [0]:
#descriptive statistics of the number of movies users reviewed
userrating_counts.describe(['count']).show()

In [0]:
#calculating top 12 highest number of ratings done by users
userrating_counts.orderBy('count', ascending=False).show(12)

In [0]:
#calculating top 12 movies with highest ratings 
df.orderBy('rating', ascending=False).show(12)

## Part B: Question 2 - Train/Test Split

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator,TrainValidationSplit

In [0]:
#Create initial ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

In [0]:
#MODEL 1
#first combination split - 80/20
(train1, test1) = df.randomSplit([0.80, 0.20])
# Train model 1 with Training Data 1
als_model_1 = als.fit(train1)
#Model 1 predictions on test1
pred_1 = als_model_1.transform(test1)

In [0]:
#MODEL 2
#second combination split - 60/40
(train2, test2) = df.randomSplit([0.6, 0.4])
# Train model 2 with Training Data 2
als_model_2 = als.fit(train2)
#Model 2 predictions on test1
pred_2 = als_model_2.transform(test2)

## Part B: Question 3 - Models Performance

In [0]:
#EVALUATORS
#Evaluation with RMSE
evaluator_RMSE = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
#Evaluation with MSE
evaluator_MSE = RegressionEvaluator(metricName="mse", labelCol="rating", predictionCol="prediction") 
#Evaluation with MAE
evaluator_MAE = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction") 

In [0]:
#MODEL 1 EVALUATION
#RMSE Model 1
rmse_1 = evaluator_RMSE.evaluate(pred_1)
#MSE Model 1
mse_1 = evaluator_MSE.evaluate(pred_1)
#MAE Model 1
mae_1 = evaluator_MAE.evaluate(pred_1)

In [0]:
#MODEL 2 EVALUATION
#RMSE Model 2
rmse_2 = evaluator_RMSE.evaluate(pred_2)
#MSE Model 2
mse_2 = evaluator_MSE.evaluate(pred_2)
#MAE Model 2
mae_2 = evaluator_MAE.evaluate(pred_2)

In [0]:
print ("RMSE of model 1: ", rmse_1, "\nRMSE of model 2: ", rmse_2)
print ("MSE of model 1: ", mse_1, "\nMSE of model 2: ", mse_2)
print ("MAE of model 1: ", mae_1, "\nMAE of model 2: ", mae_2)

## Part B: Question 4 - Tuning

#### Combination of train-test split chosen: Model 1, 80/20 split

In [0]:
# Create ParamGrid for Cross Validation
parameters = (ParamGridBuilder()
             .addGrid(als.rank, [10, 50, 100, 200])
             .addGrid(als.regParam, [.1, .15, 0.2, 0.25])
             .build())

#Build train validation split
tranvs = TrainValidationSplit(estimator=als, estimatorParamMaps=parameters, evaluator=evaluator_RMSE)

In [0]:
#BEST MODEL 1
#Run train validation split
tuned_model_1 = tranvs.fit(train1)
#Get best Model
best_model = tuned_model_1.bestModel

In [0]:
#BEST MODEL 2
#Run train validation split
tuned_model_2 = tranvs.fit(train2)
#Get best Model
best_model2 = tuned_model_2.bestModel

In [0]:
# Print Rank and RegParam
print("  Rank:", best_model._java_obj.parent().getRank())
print("  RegParam:", best_model._java_obj.parent().getRegParam())
print("  Rank 2:", best_model2._java_obj.parent().getRank())
print("  RegParam 2:", best_model2._java_obj.parent().getRegParam())

In [0]:
#Using best model to predict
tuned_pred_1 = best_model.transform(test1)
tuned_pred_2 = best_model2.transform(test2)

In [0]:
#RMSE of Best Model 1
tuned_rmse_1 = evaluator_RMSE.evaluate(tuned_pred_1)
#RMSE of Best Model 2
tuned_rmse_2 = evaluator_RMSE.evaluate(tuned_pred_2)

In [0]:
print ("RMSE of new model 1: ", tuned_rmse_1)
print ("RMSE of new model 2: ", tuned_rmse_2)

## Part B: Question 5 - Recommended Movies

In [0]:
# Generate Recommendations for all users (from best to worst)
movie_recommendations = best_model.recommendForAllUsers(100)
movie_recommendations.show()

In [0]:
#filtering for only User Id 1
movie_recommendations_ID1 = movie_recommendations.filter(movie_recommendations.userId == 1)
movie_recommendations_ID1.show()

In [0]:
#User Id 1
#Creating a list of all the movies that user Id 1 reviewed
userID1movies = df.filter(col("userId") == 1).select('movieId').rdd.flatMap(lambda x: x).collect()

#Using explode to format the array into columns 
#filtering out the movies that the user has already reviewed and showing top 12
recommendations_ID1 = movie_recommendations_ID1.withColumn("recommendations", explode("recommendations")).select('userId', col("recommendations.movieId"), col("recommendations.rating"))
recommendations_ID1.filter(~recommendations_ID1.movieId.isin(userID1movies)).show(12)

In [0]:
#filtering for only User Id 12
movie_recommendations_ID12 = movie_recommendations.filter(movie_recommendations.userId == 12)
movie_recommendations_ID12.show()

In [0]:
#User Id 12
#Creating a list of all the movies that user Id 12 reviewed
userID12movies = df.filter(col("userId") == 12).select('movieId').rdd.flatMap(lambda x: x).collect()

#Using explode to format the array into columns 
#filtering out the movies that the user has already reviewed and showing top 12
recommendations_ID12 = movie_recommendations_ID12.withColumn("recommendations", explode("recommendations")).select('userId', col("recommendations.movieId"), col("recommendations.rating"))
recommendations_ID12.filter(~recommendations_ID12.movieId.isin(userID12movies)).show(12)