# Assignment 2 APACHE-SPARK
## PART A
#### 1. Count the odd and even numbers using the file ‘integer.txt’

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
spark = SparkSession.builder.appName("1628 A2").getOrCreate()

In [0]:
integerRDD = spark.sparkContext.textFile("/FileStore/tables/integer.txt")
integerRDD = integerRDD.flatMap(lambda line: line.split())
even = integerRDD.filter(lambda x: int(x) % 2 == 0)
odd  = integerRDD.filter(lambda x: int(x) % 2 == 1)

print('There are {} even numbers.'.format(len(even.collect())))
print('There are {} even numbers.'.format(len(odd.collect())))

There are 514 even numbers.
There are 496 even numbers.


#### 2. Calculate the salary sum per department using the file ‘salary.txt’
  - Show the department name and salary sum.

In [0]:
salaryRDD = spark.sparkContext.textFile("/FileStore/tables/salary.txt")
arrayRDD = salaryRDD.map(lambda x: x.split(" "))
kvRDD = arrayRDD.map(lambda x: (x[0], int(x[1])))
sumRDD = kvRDD.reduceByKey(lambda x,y: x+y)
sumRDD.collect()

Out[4]: [('Sales', 3488491),
 ('Research', 3328284),
 ('Developer', 3221394),
 ('QA', 3360624),
 ('Marketing', 3158450)]

#### 3. Implement MapReduce using Pyspark on file ‘shakespeare.txt’. 
   - Show how many times these particular words appear in the document: Shakespeare, why, Lord, Library, GUTENBERG, WILLIAM, COLLEGE and WORLD. 
   - (Count exact words only (marks will be deducted for incorrect lowercase/uppercase))

In [0]:
shakespeareRDD = spark.sparkContext.textFile("/FileStore/tables/shakespeare_1.txt")
word_ls = ['Shakespeare', 'why', 'Lord', 'Library', 'GUTENBERG', 'WILLIAM', 'COLLEGE','WORLD']
temp = shakespeareRDD.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1))
sumRDD = temp.reduceByKey(lambda x,y: x+y)
wordRDD = sumRDD.filter(lambda x: x[0] in word_ls)
wordRDD.collect()

Out[5]: [('Shakespeare', 22),
 ('GUTENBERG', 99),
 ('WILLIAM', 115),
 ('WORLD', 98),
 ('COLLEGE', 98),
 ('why', 91),
 ('Lord', 341),
 ('Library', 2)]

#### 4. Calculate the top 15 and bottom 15 words using the file ‘shakespeare.txt’.
  - Show 15 words with the most count and 15 words with the least count. 
  - You can limit by 15 in ascending and descending order of count.

In [0]:
# Sort the RDD by word frequency in ascending order
mostFreq_15  = sumRDD.sortBy(lambda x: x[1], ascending=False).take(15)
leastFreq_15  = sumRDD.sortBy(lambda x: x[1], ascending=True).take(15)
print("The 15 most frequent words with each occurrence number:\n ", mostFreq_15)
print('--------------------------------------------------------------')
print("The 15 least frequent words with each occurrence number:\n ", leastFreq_15)

The 15 most frequent words with each occurrence number:
  [('', 231583), ('the', 11397), ('and', 8777), ('I', 8556), ('of', 7873), ('to', 7421), ('a', 5672), ('my', 4913), ('in', 4600), ('you', 4060), ('And', 3547), ('that', 3522), ('is', 3481), ('his', 3226), ('with', 3175)]
--------------------------------------------------------------
The 15 least frequent words with each occurrence number:
  [('anyone', 1), ('restrictions', 1), ('whatsoever.', 1), ('re-use', 1), ('online', 1), ('www.gutenberg.org', 1), ('COPYRIGHTED', 1), ('eBook,', 1), ('Details', 1), ('guidelines', 1), ('file.', 1), ('Author:', 1), ('Posting', 1), ('1,', 1), ('2011', 1)]


###### Note: there were many other words which appeared once only as well.

# PART B
- work with a distributed recommender system.
- To do this, create a recommender system using Apache Spark. Things that were taken into consideration were the efficiency of the systems as well as Spark’s complexity.

#### 1. Describe your data. 
  - Calculate the top 20 movies with the highest ratings and the top 15 users who provided the highest ratings.

In [0]:
# File location and type
path = "/FileStore/tables/movies.csv"

df = spark.read \
  .format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ',') \
  .option("path", path) \
  .load()
#   .option('nanValue', ' ')\
#   .option('nullValue', ' ')\

df.show(3)

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|     3|     0|
|      3|     1|     0|
|      5|     2|     0|
+-------+------+------+
only showing top 3 rows



###### Get basic info, get to know the data first

In [0]:
from pyspark.sql import functions as F

# Calculate the number of unique ratings, movies, and users
num_ratings = df.select("rating").distinct().count()
num_movies = df.select("movieId").distinct().count()
num_users = df.select("userId").distinct().count()

# rating options & thighest rating
rating_options = df.select("rating").distinct().rdd.flatMap(lambda x: x)
highest_rating = rating_options.max()

# Calculate the number of ratings with the highest mark
num_highest_ratings = df.filter(df.rating == highest_rating).count()

print("Num of ratings (choose from:{}): {}\nNum of movies: {}\nNum of users: {}".format(
    sorted(rating_options.collect()), num_ratings, num_movies, num_users))
print("Note: There are {} ratings with the highest mark {}.".format(num_highest_ratings, highest_rating))

Num of ratings (choose from:[1, 2, 3, 4, 5]): 5
Num of movies: 100
Num of users: 30
Note: There are 75 ratings with the highest mark 5.


###### top 20 movies with the highest ratings, metric chosen: average scores of each movie

In [0]:
# Calculate the average ratings for each movie
ave_rating = df.groupBy("movieId").avg("rating")
temp = ave_rating.orderBy(desc("avg(rating)")).limit(20)
top_20_movies = temp.withColumn("avg(rating)", round(col("avg(rating)"), 2))   #round values
top_20_movies.show()

+-------+-----------+
|movieId|avg(rating)|
+-------+-----------+
|     32|       2.92|
|     90|       2.81|
|     30|        2.5|
|     94|       2.47|
|     23|       2.47|
|     49|       2.44|
|     29|        2.4|
|     18|        2.4|
|     52|       2.36|
|     53|       2.25|
|     62|       2.25|
|     92|       2.21|
|     46|        2.2|
|     68|       2.16|
|     87|       2.13|
|      2|       2.11|
|     69|       2.08|
|     27|       2.07|
|     88|       2.06|
|     22|       2.05|
+-------+-----------+



###### 15 users who provided the highest ratings, metric chosen: average scores of each user

In [0]:
ave_rating = df.groupBy("userId").avg("rating")
temp = ave_rating.orderBy(desc("avg(rating)")).limit(15)
top_15_users = temp.withColumn("avg(rating)", round(col("avg(rating)"), 2))
top_15_users.show()

+------+-----------+
|userId|avg(rating)|
+------+-----------+
|    11|       2.29|
|    26|        2.2|
|    22|       2.16|
|    23|       2.13|
|     2|       2.07|
|    17|       1.96|
|     8|        1.9|
|    24|       1.88|
|    12|       1.85|
|     3|       1.83|
|    29|       1.83|
|    28|       1.82|
|     9|       1.79|
|    14|       1.79|
|    16|       1.78|
+------+-----------+



#### 2. Split the dataset into train and test. 
   - Try 2 different combinations: 70/30,and 80/20.

#### 3. Explain MSE, RMSE and MAE. 
   - Compare and evaluate both of your models with evaluation metrics (RMSE or MAE)
   - Describe which one works better and why?

###### 80 & 20

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

################### Create train and test dataset, here we use 0.8 & 0.2 #################

(train_82, test_82) = df.randomSplit([0.8, 0.2],seed=1628)

# Build the recommendation model using ALS on the training data
als = ALS(userCol= 'userId', itemCol= 'movieId', ratingCol= 'rating', coldStartStrategy = 'drop')
model_82 = als.fit(train_82)
pred_82 = model_82.transform(test_82)

# RMSE on the test data
evaRMSE_82 = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse_82 = evaRMSE_82.evaluate(pred_82)
print("RMSE of 80% train and 20% test: ", rmse_82)

# MAE on the test data
evaMAE_82 = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
mae_82 = evaMAE_82.evaluate(pred_82)
print("MAE of 80% train and 20% test: ", mae_82)

RMSE of 80% train and 20% test:  1.057657381718659
MAE of 80% train and 20% test:  0.7654315561480369


###### 70 & 30

In [0]:
################### Create train and test dataset, here we use 0.7 & 0.3 #################
(train_73, test_73) = df.randomSplit([0.7, 0.3], seed=1628)
model_73 = als.fit(train_73)
pred_73 = model_73.transform(test_73)

# RMSE on the test data
evaRMSE_73 = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse_73 = evaRMSE_73.evaluate(pred_73)
print("RMSE of 70% train and 30% test: ", rmse_73)

# MAE on the test data
evaMAE_73 = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
mae_73 = evaMAE_73.evaluate(pred_73)
print("MAE of 70% train and 30% test: ", mae_73)

RMSE of 70% train and 30% test:  1.0873905618996584
MAE of 70% train and 30% test:  0.7619639403651358


- Description of the metrics
   - Mean Squared Error (MSE): Determined by averaging the squared differences between the predicted values and the actual values. By squaring the errors, MSE assigns higher significance to larger deviations. MSE magnifies larger errors, making them contribute more to the overall loss, which makes the MSE particularly useful for regression problems where significant deviations from the true values need to be penalized more.
   - Root Mean Squared Error (RMSE): Square root of MSE. 
   - Mean Absolute Error (MAE): Calculated by averaging the absolute differences between the predicted values and the actual values. Unlike MSE with squaring the differences, MAE is less influenced by outliers, which is useful when the outliers or large errors should not be overly penalized. 

- Result
   - RMSE of 70% and 30%:  1.08739 |
     MAE of 70% and 30%:  0.76196 

   - RMSE of 80% and 20%:  1.05766 |
      MAE of 80% and 20%:  0.76543

- I may choose RMSE as the metric as the difference is bigger, possibly indicating a clear comparison with less impact of randomness. And for regression problems, significant deviations from the true values need to be penalized more.
 - The difference between the MAE values is minimal, suggesting that both models perform similarly in terms of absolute errors. Based on the evaluation metrics, the model with the 80/20 split appears to work better overall. It achieves a lower RMSE, indicating better accuracy, while maintaining a competitive MAE value. The larger training set (80%) may have provided more data for the model to learn from, resulting in improved performance on the test data.

#### 4. Now tune the parameters of your algorithm to get the best set of parameters. 
- Explain different parameters of the algorithm which you have used for tuning your algorithm. 

  - rank: Rank determines how many hidden features the ALS model here will learn. These features are like hidden qualities which can help the model understand and make predictions about users and items(movies here). Defaults to 10.

  - maxIter: It specifies the maximum number of times the ALS algorithm here will repeat its calculations to find the best model. More iterations can improve accuracy, but it takes more time. Defaults to 10.

  - regParam: Control the regularization strength in ALS here. Regularization prevents the model from overfitting by adding a penalty for fitting the data too closely. The regParam value adjusts the intensity of this penalty. Defaults to 1.0 
  
- Evaluate all your models again. Show your code with the best values and output.

###### 80 & 20

In [0]:
# Based on code from https://spark.apache.org/docs/latest/ml-tuning.html#train-%20validation-split and course material.

#Tune model using ParamGridBuilder
parameters= ParamGridBuilder().addGrid(als.rank,[10,15,20]).addGrid(als.maxIter,[10,15,20]).addGrid(als.regParam,[0.01,0.1,1]).build()

#Use model evaluator as RMSE
eval = RegressionEvaluator(metricName= 'rmse',labelCol= 'rating', predictionCol= 'prediction')

#Build train validation split, 80% of the data will be used for training, 20% for validation.
trainvs = TrainValidationSplit(estimator=als, estimatorParamMaps=parameters, evaluator=eval,trainRatio=0.8)

#Build cross validator, if needed
#cv = CrossValidator(estimator=als, estimatorParamMaps=parameters,evaluator=eval, numFolds=3)

#Run TrainValidationSplit, and choose the best set of parameters

################################### For 80 & 20 spliiting ###################################################
model_82 = trainvs.fit(train_82)  
best_model_82 = model_82.bestModel  
pred_82 = best_model_82.transform(test_82)   
rmse_82 = eval.evaluate(pred_82)   

print("Best Paramters combinations for 80&20 splitting: ")
print("\nrank: " , best_model_82._java_obj.parent().getRank())
print("maxIter: " , best_model_82._java_obj.parent().getMaxIter())
print("regParam: " , best_model_82._java_obj.parent().getRegParam())
print("RMSE of test set: ", rmse_82) 

Best Paramters combinations for 80&20 splitting: 

rank:  15
maxIter:  20
regParam:  0.1
RMSE of test set:  1.0350194267005735


###### 70 & 30

In [0]:
#################################### For 70 & 30 Spliiting ################################################

model_73 = trainvs.fit(train_73)
best_model_73 = model_73.bestModel  
pred_73 = best_model_73.transform(test_73)
rmse_73 = eval.evaluate(pred_73)

print("Best Paramters combinations for 70&30 splitting: ")
print("\nrank: " , best_model_73._java_obj.parent().getRank())
print("maxIter: " , best_model_73._java_obj.parent().getMaxIter())
print("regParam: " , best_model_73._java_obj.parent().getRegParam())
print("RMSE of test set: ", rmse_73) 

Best Paramters combinations for 70&30 splitting: 

rank:  15
maxIter:  15
regParam:  0.1
RMSE of test set:  1.0541131040702143


Although the best values may occured at the end of the range, I just performed how to do the tunning. Yes we can definately increase the possible value for hyperparameters for a better result! This would take much more time to be run, not just for model tunning, but also for further use. For exmaple, the maxIter specifies the maximum number of times the ALS algorithm will repeat its calculations to find the best model, which would lead to huge time expense.
- The bese value of the hyparameters with model (80 training and 20 testing):
  - rank:  15
  - maxIter:  20
  - regParam:  0.1
- RMSE of test set of this model:  1.0350

#### 5. Calculate the top 15 movie recommendations for user id 10 and user id 14. Show your code and output.
- Choose best model with 80 training and 20 testing data.

In [0]:
userRecs = best_model_82.recommendForAllUsers(15)

print("For user id 10")
user10 = userRecs.filter(F.col('userId') == 10)
user10.show(truncate = False)

print("For user id 14")
user14 = userRecs.filter(F.col('userId') == 14)
user14.show(truncate = False)

For user id 10
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                                                                           |
+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|10    |[{2, 3.3830044}, {40, 3.1212664}, {92, 2.8936815}, {49, 2.7336671}, {25, 2.6379092}, {12, 2.525733}, {62, 2.4848144}, {42, 2.4519153}, {81, 2.4222684}, {4, 2.2977078}, {0, 2.1965504}, {82, 2.138171

##### TOP 15 movies: 
- for user ID 10: Movie  2, 40, 92, 49, 25, 12, 62, 42, 81, 4, 0, 82, 95, 91, 9
- for user ID 14: Movie 29, 52, 76, 63, 62, 96, 72, 58, 53, 2, 85, 93, 47, 60, 67