In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=c5ea8edb507b71e0119ff6259404559b121f5990b9c12dd2fb0f448694f7a4fa
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
spark = SparkSession.builder.appName("Movie Recommendations").getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
data_schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", IntegerType(), True)
])

ratings_spark = spark.read.csv('/content/drive/MyDrive/project2/ratings_small.csv', header=True, schema=data_schema).cache()
ratings_df = (ratings_spark.select(
    'userId',
    'movieId',
    'rating'
)).cache()

In [None]:
# Split the data into training and test datasets
(training, test) = ratings_df.randomSplit([0.8, 0.2], seed=42)

Collaborative Filtering

In [None]:
# Your existing ALS model training and evaluation code
svd = ALS(
          rank=30,
          maxIter=4, 
          regParam=0.1,
          userCol='userId', 
          itemCol='movieId', 
          ratingCol='rating',
          coldStartStrategy='drop',
          implicitPrefs=False
         )
svd_model = svd.fit(training)

svd_predictions = svd_model.transform(test)
svd_evaluator = RegressionEvaluator(metricName='mae', labelCol='rating',
                                predictionCol='prediction')

svd_mae = svd_evaluator.evaluate(svd_predictions)
print(f'MAE (Test) = {svd_mae}')

MAE (Test) = 0.7144480259975858


In [None]:
svd_model.recommendForAllUsers(1).show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|  [{1172, 3.490329}]|
|     2| [{1192, 5.0077834}]|
|     3| [{83411, 4.675861}]|
|     4|  [{1192, 5.794789}]|
|     5|[{89904, 4.8696694}]|
+------+--------------------+
only showing top 5 rows



In [None]:
# show the recommendation films' titles 
movie_ids_to_find = [1172, 1192, 83411, 89904]
filtered_movies = movies_spark.filter(col("movieId").isin(movie_ids_to_find))
movie_titles = filtered_movies.select("movieId", "title")
movie_titles.show()

+-------+----------------+
|movieId|           title|
+-------+----------------+
|   1172| Cinema Paradiso|
|   1192|Paris is Burning|
|  83411|            Cops|
|  89904|      The Artist|
+-------+----------------+



In [None]:
# Import necessary modules
import pyspark.sql.functions as sql_func
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, IndexToString

Hybrid Filtering

In [None]:
movies_df = spark.read.csv('/content/drive/MyDrive/project2/metadata_analysis.csv', header=True).cache()

In [None]:
movies_df.columns

['_c0',
 'budget',
 'genres',
 'id',
 'imdb_id',
 'original_language',
 'original_title',
 'overview',
 'popularity',
 'production_companies',
 'production_countries',
 'release_date',
 'revenue',
 'runtime',
 'spoken_languages',
 'status',
 'title',
 'vote_average',
 'vote_count',
 'movieId']

In [None]:
# Select the relevant features
selected_cols = ['movieId', 'genres', 'title']
movies_df = movies_df.select(*selected_cols)

In [None]:
movies_df=movies_df.withColumn("movieId", col("movieId").cast("integer"))

In [None]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|              genres|               title|
+-------+--------------------+--------------------+
|      1|[{'id': 16, 'name...|           Toy Story|
|      2|[{'id': 12, 'name...|             Jumanji|
|      3|[{'id': 10749, 'n...|    Grumpier Old Men|
|   null|[{'id': 35, 'name...|               127.0|
|      5|[{'id': 35, 'name...|Father of the Bri...|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- genres: string (nullable = true)



In [None]:
from pyspark.sql.functions import udf, concat_ws, col
from pyspark.sql.types import StringType

# Define a UDF to extract genre names and join them into a single string
def extract_genre_names(genres_str):
    try:
        genre_list = eval(genres_str)
        genre_names = [genre['name'] for genre in genre_list]
        return '|'.join(genre_names)
    except:
        return ''

extract_genre_names_udf = udf(extract_genre_names, StringType())

# Process the 'genres' column in the movies_df DataFrame
movies_df = movies_df.withColumn('genres', extract_genre_names_udf(col('genres')))

# Show the first 5 rows of the modified movies_df DataFrame
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|              genres|               title|
+-------+--------------------+--------------------+
|      1|Animation|Comedy|...|           Toy Story|
|      2|Adventure|Fantasy...|             Jumanji|
|      3|      Romance|Comedy|    Grumpier Old Men|
|   null|Comedy|Drama|Romance|               127.0|
|      5|              Comedy|Father of the Bri...|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import split, col
movies_spark = movies_df.withColumn("genres", split(col("genres"), "\\|"))

In [None]:
movies_spark = movies_spark.na.drop()
movies_spark.show(5)

+-------+--------------------+--------------------+
|movieId|              genres|               title|
+-------+--------------------+--------------------+
|      1|[Animation, Comed...|           Toy Story|
|      2|[Adventure, Fanta...|             Jumanji|
|      3|   [Romance, Comedy]|    Grumpier Old Men|
|      5|            [Comedy]|Father of the Bri...|
|      6|[Action, Crime, D...|                Heat|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
# Filter rows with title "Released"
released_movies = movies_spark.filter(movies_spark["title"] == "Released")

# Count the number of rows with title "Released"
released_movies.count()

1089

In [None]:
# Drop rows with title "Released"
movies_spark = movies_spark.filter(movies_spark["title"] != "Released")

In [None]:
from pyspark.ml.feature import CountVectorizer, IDF, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.sql.functions import col, split

# Content-based model
cv = CountVectorizer(inputCol="genres", outputCol="tf")
idf = IDF(inputCol="tf", outputCol="features")
pipeline = Pipeline(stages=[cv, idf])
pipeline_model = pipeline.fit(movies_spark)
movies_features = pipeline_model.transform(movies_spark)

In [None]:
# Collaborative filtering model
als = ALS(rank=30, maxIter=4, regParam=0.1, userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop', implicitPrefs=False)
als_model = als.fit(training)

In [None]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

from pyspark.sql.types import DoubleType

@udf(DoubleType())
def dot_product(v1, v2):
    try:
        return float(v1.dot(v2))
    except TypeError:
        return None

def recommend_hybrid(userId, n_recommendations=10):
    # Collaborative filtering recommendations
    user_ratings = ratings_df.filter(ratings_df['userId'] == userId)
    user_unrated_movies = movies_features.alias("movies").join(user_ratings.alias("ratings"), col("movies.movieId") == col("ratings.movieId"), how="left_anti")
    user_unrated_movies = user_unrated_movies.select("movies.movieId", "movies.features").withColumn("userId", lit(userId))
    cf_predictions = als_model.transform(user_unrated_movies).select("movieId", "prediction")

    # Content-based recommendations
    user_profile = pipeline_model.transform(user_ratings.join(movies_spark, on="movieId"))
    user_profile = user_profile.withColumn("product", dot_product(col("features"), col("rating")))
    user_profile = user_profile.groupBy("userId").agg(expr("sum(product)").alias("features_sum"), expr("sum(rating)").alias("rating_sum"))
    user_profile = user_profile.withColumn("features", col("features_sum") / col("rating_sum"))

    content_based_recommendations = pipeline_model.transform(movies_spark).withColumn("userId", lit(userId)).alias("movies_features")
    content_based_recommendations = content_based_recommendations.join(user_profile.alias("user_profile"), on="userId", how="inner")
    content_based_recommendations = content_based_recommendations.withColumn("content_based_score", dot_product(col("movies_features.features"), col("user_profile.features"))).select("movieId", "content_based_score")

    # Combine both recommendations
    hybrid_recommendations = cf_predictions.join(content_based_recommendations, on="movieId")
    hybrid_recommendations = hybrid_recommendations.withColumn("hybrid_score", col("prediction") * 0.5 + col("content_based_score") * 0.5)
    hybrid_recommendations = hybrid_recommendations.orderBy("hybrid_score", ascending=False).limit(n_recommendations)

    return hybrid_recommendations.select("movieId")



In [None]:
from pyspark.sql.functions import expr, lit
# Test the hybrid model with a user
user_id = 1
n_recommendations = 10
recommended_movies = recommend_hybrid(user_id, n_recommendations)

# To display movie titles along with the recommendations, join the recommended_movies DataFrame with the movies_spark DataFrame
recommended_movies_with_titles = recommended_movies.join(movies_spark, on="movieId").select("movieId", "title")

# Print the recommended movie titles and hybrid scores
recommended_movies_with_titles.show()

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|    148|An Awfully Big Ad...|
|    463|       Guilty as Sin|
|    471| The Hudsucker Proxy|
|    496|What Happened Was...|
|    833|    High School High|
|   1088|       Dirty Dancing|
|   1238|          Local Hero|
|   1342|            Candyman|
|   1580|        Men in Black|
|   1591|               Spawn|
+-------+--------------------+

