### Question 1

In [2]:
### 1st Question
import pandas as pd

# Read the CSV file into a DataFrame
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

df = pd.read_csv('movies.csv')

# Display the first 5 rows
print(df.head().to_markdown(index=False, numalign="left", stralign="left"))

# Print the column names and their data types
print(df.info())

# Calculate top 12 movies with highest ratings
top_movies = df.groupby('movieId')['rating'].mean().sort_values(ascending=False).head(12)
print("\nTop 12 movies with highest ratings:\n")
print(top_movies.to_markdown(numalign="left", stralign="left"))

# Calculate top 12 users who provided highest ratings
top_users = df.groupby('userId')['rating'].mean().sort_values(ascending=False).head(12)
print("\nTop 12 users who provided highest ratings:\n")
print(top_users.to_markdown(numalign="left", stralign="left"))


| movieId   | rating   | userId   |
|:----------|:---------|:---------|
| 2         | 3        | 0        |
| 3         | 1        | 0        |
| 5         | 2        | 0        |
| 9         | 4        | 0        |
| 11        | 1        | 0        |
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1501 entries, 0 to 1500
Data columns (total 3 columns):
 #   Column   Non-Null Count  Dtype
---  ------   --------------  -----
 0   movieId  1501 non-null   int64
 1   rating   1501 non-null   int64
 2   userId   1501 non-null   int64
dtypes: int64(3)
memory usage: 35.3 KB
None

Top 12 movies with highest ratings:

| movieId   | rating   |
|:----------|:---------|
| 32        | 2.91667  |
| 90        | 2.8125   |
| 30        | 2.5      |
| 94        | 2.47368  |
| 23        | 2.46667  |
| 49        | 2.4375   |
| 29        | 2.4      |
| 18        | 2.4      |
| 52        | 2.35714  |
| 62        | 2.25     |
| 53        | 2.25     |
| 92        | 2.21429  |

Top 12 users who provided hig

### Question 2 & 3

In [4]:
### 2nd Question
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Create a SparkSession (if not already created)
spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()

# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Create an ALS model
als = ALS(
    maxIter=5,
    regParam=0.01,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
)

# Split the data into training and test sets (70/30) randomly
(training, test) = spark_df.randomSplit([0.7, 0.3])

# Train the model
model = als.fit(training)

# Make predictions on the test data
predictions = model.transform(test)

# Evaluate the model (70/30)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
evaluator = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
mae = evaluator.evaluate(predictions)

# Split the data into training and test sets (80/20) randomly
(training, test) = spark_df.randomSplit([0.8, 0.2])

# Train the model
model = als.fit(training)

# Make predictions on the test data
predictions = model.transform(test)

# Evaluate the model (80/20)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse2 = evaluator.evaluate(predictions)
evaluator = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")
mae2 = evaluator.evaluate(predictions)

# Print RMSE and MAE for both splits
print("RMSE (70/30 split):", rmse)
print("MAE (70/30 split):", mae)
print("RMSE (80/20 split):", rmse2)
print("MAE (80/20 split):", mae2)

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=2db71935a9ac0bd373c53994565bf9ef5811f744f4e80d24524ebb53ca80a6ce
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3
RMSE (70/30 split): 2.2013052941111355
MAE (70/30 split): 1.651832950880826
RMSE (80/20 split): 1.8354844277682187
MAE (80/20 split): 1.38034714297838


### Question 4

In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize SparkSession
spark = SparkSession.builder.appName("RecommendationSystemTuning").getOrCreate()

# Load the dataset
df = spark.read.csv("movies.csv", header=True, inferSchema=True)

# Split dataset into train and test sets (70/30)
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# Build the recommendation system with ALS
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
)

# Define the parameter grid for tuning
param_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, [10, 15, 20])
    .addGrid(als.maxIter, [5, 10])
    .addGrid(als.regParam, [0.01, 0.1, 0.2])
    .build()
)

# Define the evaluator which uses MAE
evaluator = RegressionEvaluator(
    metricName="mae", labelCol="rating", predictionCol="prediction"
)

# Cross validation with 5 folds
crossval = CrossValidator(
    estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5
)

# Train the model with 5-fold validation
cv_model = crossval.fit(train_data)

# Get the model with best param combination
best_model = cv_model.bestModel

# Make predictions on the test data
predictions = best_model.transform(test_data)

# Evaluate the best model
mae = evaluator.evaluate(predictions)
print(f"MAE of best model: {mae}")

MAE of best model: 0.7445522238261736


### Question 5

In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col

# Initialize SparkSession
spark = SparkSession.builder.appName("MovieRecommendations").getOrCreate()

# Load the dataset
df = spark.read.csv("movies.csv", header=True, inferSchema=True)

# Split dataset into train and test (70/30)
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

# Build the recommendation model using ALS
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
)

# Define the parameter grid for tuning
param_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, [10, 15, 20])
    .addGrid(als.maxIter, [5, 10])
    .addGrid(als.regParam, [0.01, 0.1, 0.2])
    .build()
)

# Define the evaluator
evaluator = RegressionEvaluator(
    metricName="mae", labelCol="rating", predictionCol="prediction"
)

# Create the cross-validator
crossval = CrossValidator(
    estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5
)

# Fit the model with the best parameters
cv_model = crossval.fit(train_data)

# Get the best model
best_model = cv_model.bestModel

# Generate top 12 movie recommendations for user 10 and user 12
user_recs = best_model.recommendForAllUsers(12)

# Show the recommendations for user 10
user_recs.filter(col("userId") == 10).show(truncate=False)

# Show the recommendations for user 12
user_recs.filter(col("userId") == 12).show(truncate=False)

+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                            |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|10    |[{25, 2.2450678}, {92, 2.1650987}, {49, 2.1060581}, {89, 1.8897823}, {62, 1.8657926}, {42, 1.8157381}, {29, 1.5947124}, {31, 1.588448}, {47, 1.5870534}, {12, 1.5359523}, {32, 1.5212767}, {91, 1.5136149}]|
+------+--------------------------------------------------------------------------------------------------------------------------------------------