In [4]:
import pyspark
import streamlit as st
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, explode
import pandas as pd

In [2]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("MovieRecommender") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .getOrCreate()

# Load movie ratings data from a file (replace with your data path)
ratings_df = spark.read.csv("Data/ratings.csv", header=True, inferSchema=True)
movies_df = spark.read.csv("Data/movies.csv", header=True, inferSchema=True)

# Show a sample of the data
ratings_df.show(5)
movies_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [2]:
# Create ALS model
als = ALS(maxIter=10, regParam=0.1, rank=10, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

# Split the data into training and test sets
(training, test) = ratings_df.randomSplit([0.8, 0.2])

# Train the model
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")


Root-mean-square error = 0.8829641827190008


In [20]:
# Get top 10 movie recommendations for a specific user
user_id = 123
user_recommendations = model.recommendForUserSubset(ratings_df.filter(col("userId") == user_id), 10)

# Explode the recommendations to get individual movie recommendations
exploded_recommendations = user_recommendations.select(explode("recommendations").alias("recommendation"))

# Extract recommended movie IDs as a DataFrame
recommended_movie_ids_df = exploded_recommendations.select(col("recommendation.movieId").alias("movieId"))

# Join with movies_df to get recommended movies
recommended_movies = recommended_movie_ids_df.join(movies_df, on="movieId", how="inner")

# Show the recommended movies
recommended_movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|  33649|  Saving Face (2004)|Comedy|Drama|Romance|
|  25771|Andalusian Dog, A...|             Fantasy|
| 179135|Blue Planet II (2...|         Documentary|
| 138966|Nasu: Summer in A...|           Animation|
| 134796|  Bitter Lake (2015)|         Documentary|
| 117531|    Watermark (2014)|         Documentary|
|  86237|  Connections (1978)|         Documentary|
|  84273|Zeitgeist: Moving...|         Documentary|
|  74226|Dream of Light (a...|   Documentary|Drama|
|  72171|Black Dynamite (2...|       Action|Comedy|
+-------+--------------------+--------------------+



In [12]:
# Initialize Spark session
spark = SparkSession.builder.appName("MovieRecommender").getOrCreate()

# Load data
ratings_df = spark.read.csv("Data/ratings.csv", header=True, inferSchema=True)
movies_df = spark.read.csv("Data/movies.csv", header=True, inferSchema=True)

# Train ALS model
als = ALS(maxIter=10, regParam=0.1, rank=10, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(ratings_df)

# Streamlit GUI
st.image("background_img.jpg", caption="Movie Recommendation Engine Logo", use_column_width=True)
st.title("Movie Recommendation Engine")
user_id = st.number_input("Enter User ID", min_value=1, step=1)

if st.button("Get Recommendations"):
    user_recommendations = model.recommendForUserSubset(ratings_df.filter(col("userId") == user_id), 10)
    watched_history = ratings_df.filter(col("userId") == user_id)
    
    # Explode the recommendations to get individual movie recommendations
    exploded_recommendations = user_recommendations.select(explode("recommendations").alias("recommendation"))
    
    # Extract recommended movie IDs as a DataFrame
    recommended_movie_ids_df = exploded_recommendations.select(col("recommendation.movieId").alias("movieId"))
    
    # Join with movies_df to get recommended movies
    recommended_movies = recommended_movie_ids_df.join(movies_df, on="movieId", how="inner")
    
    # Collect the top 10 recommended movies
    recommended_movies_list = recommended_movies.limit(10).collect()
    
    # Convert the collected rows to a pandas DataFrame
    recommended_movies_pd_df = pd.DataFrame(recommended_movies_list, columns=["movieId", "title","genres"])
    
    # Display the DataFrame as a table in Streamlit
    st.write("Top 10 Movie Recommendations:")
    st.table(recommended_movies_pd_df)
    
    #Watched History
    watched_history = ratings_df.filter(col("userId") == user_id)
    watched_history_movie_ids_df = watched_history.select(col("movieId"))
    watched_movies =watched_history_movie_ids_df.join(movies_df, on="movieId", how="inner")
    
    watched_movies_list=watched_movies.collect()
    
     # Convert the collected rows to a pandas DataFrame
    watched_movies_pd_df = pd.DataFrame(watched_movies_list, columns=["movieId", "title","genres"])
    
    # Display the DataFrame as a table in Streamlit
    st.write("Watched History:")
    st.table(watched_movies_pd_df)

# Run Streamlit app
# In your terminal, run: streamlit run MovieRecommendationEngine.py


2024-07-20 17:47:17.927 
  command:

    streamlit run C:\Users\hussa\anaconda3\lib\site-packages\ipykernel_launcher.py [ARGUMENTS]
2024-07-20 17:47:17.957 Session state does not function when running a script without `streamlit run`
