In [1]:
import os
# Find the latest version of spark 3.x  from https://downloads.apache.org/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.5'
spark_version = 'spark-3.5.5'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,692 kB]
Get:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:9 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,236 kB]
Get:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Get:11 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [69.9 kB]
Get:12 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease [24.3 kB]

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import json

In [4]:
# Initialize Spark session
spark = SparkSession.builder.appName("MoviesData").getOrCreate()

# Load the JSON file into a PySpark DataFrame
df = spark.read.json("movie_results.json")  # Replace with your JSON file path

# Show the structure of the JSON to understand the data
df.printSchema()

root
 |-- adult: boolean (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- belongs_to_collection: struct (nullable = true)
 |    |-- backdrop_path: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- poster_path: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- error: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- origin_country: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production

In [7]:
# Extract the genre names (explode the array of genres to a new row per genre)
df_genres = df.select(
    col("id").alias("movieId"),
    explode(col("genres")).alias("genre")
).select(
    "movieId",
    col("genre.name").alias("genre_name")
)
# Index the genre names (convert them to numerical values)
indexer = StringIndexer(inputCol="genre_name", outputCol="genre_index")

# Encode the genre indices using OneHotEncoder
encoder = OneHotEncoder(inputCol="genre_index", outputCol="genre_vec")

# Combine indexing and encoding in a pipeline
pipeline = Pipeline(stages=[indexer, encoder])
genre_model = pipeline.fit(df_genres)
df_genres_encoded = genre_model.transform(df_genres)

# Now join the original movie DataFrame with the encoded genres
df_selected = df.select(
    col("id").alias("movieId"),
    col("title").alias("name"),
    col("popularity")
)

df_with_genre = df_selected.join(df_genres_encoded, on="movieId", how="left")


In [8]:
# Now join with ratings DataFrame (assuming 'ratings.csv' exists)
rating_df = spark.read.csv('ratings.csv', header=True, inferSchema=True)
links_df = spark.read.csv('links.csv', header=True, inferSchema=True)

# Join df_with_genre with links_df to add tmdbId and then join with rating_df
df_with_tmdb = df_with_genre.join(links_df, 'movieId', 'inner')
final_df = df_with_tmdb.join(rating_df, 'movieId', 'inner')

# Select the minimal required columns
final_df_selected = final_df.select(
    col("userId"),
    col("tmdbId"),
    col("name"),
    col("popularity"),
    col("rating"),
    col("genre_vec")  # Include the genre vector
)

# Show the final DataFrame to verify
final_df_selected.show(truncate=False)

+------+------+------------------------------------+----------+------+---------------+
|userId|tmdbId|name                                |popularity|rating|genre_vec      |
+------+------+------------------------------------+----------+------+---------------+
|104   |61337 |Ray                                 |3.581     |2.0   |(18,[0],[1.0]) |
|104   |61337 |Ray                                 |3.581     |2.0   |(18,[16],[1.0])|
|156   |61337 |Ray                                 |3.581     |3.0   |(18,[0],[1.0]) |
|156   |61337 |Ray                                 |3.581     |3.0   |(18,[16],[1.0])|
|156   |24086 |Harley Davidson and the Marlboro Man|3.623     |3.0   |(18,[1],[1.0]) |
|156   |24086 |Harley Davidson and the Marlboro Man|3.623     |3.0   |(18,[3],[1.0]) |
|196   |24086 |Harley Davidson and the Marlboro Man|3.623     |4.0   |(18,[1],[1.0]) |
|196   |24086 |Harley Davidson and the Marlboro Man|3.623     |4.0   |(18,[3],[1.0]) |
|267   |24086 |Harley Davidson and the Marl

In [9]:
# Ensure there are no missing ratings
final_df_selected = final_df_selected.dropna(subset=['rating'])

# Split data into training and test sets
(training_data, test_data) = final_df_selected.randomSplit([0.8, 0.2], seed=1234)

In [26]:
# Function to train and evaluate ALS model
def train_evaluate_als(rank_value):
    als = ALS(
        userCol="userId",
        itemCol="tmdbId",
        ratingCol="rating",
        maxIter=20,  # Keeping iterations constant
        rank=rank_value,  # Adjusting rank
        regParam=0.05,  # Regularization
        alpha=0.5,  # For implicit feedback (if applicable)
        coldStartStrategy="drop"
    )

    # Train the model
    model = als.fit(training_data)

    # Get predictions
    predictions = model.transform(test_data)

    # Evaluate R-squared
    evaluator_r2 = RegressionEvaluator(metricName="r2", labelCol=


                                       "rating", predictionCol="prediction")
    r2 = evaluator_r2.evaluate(predictions)

    print(f"R-squared for rank={rank_value}: {r2}")
    return r2

# Test different rank values
ranks = [15, 50, 100]
r2_results = {rank: train_evaluate_als(rank) for rank in ranks}

# Display best performing rank
best_rank = max(r2_results, key=r2_results.get)
print(f"\nBest rank: {best_rank} with R-squared: {r2_results[best_rank]}")

R-squared for rank=15: 0.5976689178260808
R-squared for rank=50: 0.7091643509512533
R-squared for rank=100: 0.7472547824673609

Best rank: 100 with R-squared: 0.7472547824673609


In [42]:
predictions.select(F.min("prediction").alias("min_prediction"),
                   F.max("prediction").alias("max_prediction")).show()

+--------------+--------------+
|min_prediction|max_prediction|
+--------------+--------------+
|    -0.6389645|     6.1329803|
+--------------+--------------+



In [57]:
# Generate top N recommendations for all users
top_n_recommendations = model.recommendForAllUsers(10)  # Top 10 recommendations per user

# Explode the recommendations to get one row per movie recommendation
exploded_recommendations = top_n_recommendations.select(
    "userId",
    explode("recommendations").alias("recommendation")
)

# Extract tmdbId and other details from the exploded recommendations
final_recommendations = exploded_recommendations.select(
    "userId",
    col("recommendation.tmdbId").alias("tmdbId"),
    col("recommendation.rating").alias("predicted_rating")
)

# Now join the recommendations with the movie metadata
final_with_metadata = final_recommendations.join(df_with_tmdb, "tmdbId", "inner")

# Show the recommendations with movie details (title, popularity, genre, etc.)
final_with_metadata.show(truncate=False)

+------+------+----------------+-------+-------------------+----------+----------+-----------+---------------+------+
|tmdbId|userId|predicted_rating|movieId|name               |popularity|genre_name|genre_index|genre_vec      |imdbId|
+------+------+----------------+-------+-------------------+----------+----------+-----------+---------------+------+
|1959  |57    |5.116563        |1656   |The Legend of Zorro|3.59      |Action    |1.0        |(18,[1],[1.0]) |120257|
|1959  |57    |5.116563        |1656   |The Legend of Zorro|3.59      |Adventure |4.0        |(18,[4],[1.0]) |120257|
|1959  |57    |5.116563        |1656   |The Legend of Zorro|3.59      |Western   |17.0       |(18,[17],[1.0])|120257|
|1645  |120   |4.672034        |805    |Rosemary's Baby    |4.486     |Drama     |0.0        |(18,[0],[1.0]) |117913|
|1645  |120   |4.672034        |805    |Rosemary's Baby    |4.486     |Horror    |6.0        |(18,[6],[1.0]) |117913|
|1645  |120   |4.672034        |805    |Rosemary's Baby 