In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import explode
from timeit import default_timer as timer
import pandas as pd

In [2]:
spark = SparkSession.builder.appName("RecommenderSystem").getOrCreate()

# Load ratings and movies
ratings = spark.read.csv("data/ratings.csv", header=True, inferSchema=True)
movies = spark.read.csv("data/movies.csv", header=True, inferSchema=True)

# Train-test split
(training, test) = ratings.randomSplit([0.8, 0.2], seed=42)

In [3]:
from pyspark.sql import functions as F

'''
Pyspark Dataframe API
'''
start_df = timer()

movie_stats = ratings.groupBy("movieId").agg(
    F.count("rating").alias("rating_count"),
    F.avg("rating").alias("avg_rating")
)

# Filter for popular movies (e.g. 50+ ratings)
popular_movies = movie_stats.filter("rating_count >= 50")

# Join with movie titles
popular_movies = popular_movies.join(movies, on="movieId", how="inner")

# Sort by average rating
top_movies_df = popular_movies.orderBy("avg_rating", ascending=False)

top_movies_df.show(10, truncate=False)

end_df = timer()
print(f"DataFrame API execution time: {end_df - start_df:.2f} seconds")

+-------+------------+-----------------+---------------------------------------------------------------------------+---------------------------+
|movieId|rating_count|avg_rating       |title                                                                      |genres                     |
+-------+------------+-----------------+---------------------------------------------------------------------------+---------------------------+
|318    |317         |4.429022082018927|Shawshank Redemption, The (1994)                                           |Crime|Drama                |
|858    |192         |4.2890625        |Godfather, The (1972)                                                      |Crime|Drama                |
|2959   |218         |4.272935779816514|Fight Club (1999)                                                          |Action|Crime|Drama|Thriller|
|1276   |57          |4.271929824561403|Cool Hand Luke (1967)                                                      |Drama         

In [4]:
'''
Spark SQL Comparison
'''

# Register DataFrames as views
ratings.createOrReplaceTempView("ratings")
movies.createOrReplaceTempView("movies")

start_sql = timer()

# Use SQL to get top movies with 50+ ratings
query = """
SELECT 
    r.movieId,
    m.title,
    COUNT(r.rating) AS rating_count,
    AVG(r.rating) AS avg_rating
FROM ratings r
JOIN movies m ON r.movieId = m.movieId
GROUP BY r.movieId, m.title
HAVING COUNT(r.rating) >= 50
ORDER BY avg_rating DESC
"""

top_movies_sql = spark.sql(query)
top_movies_sql.show(10, truncate=False)

end_sql = timer()
print(f"Spark SQL execution time: {end_sql - start_sql:.2f} seconds")

+-------+---------------------------------------------------------------------------+------------+-----------------+
|movieId|title                                                                      |rating_count|avg_rating       |
+-------+---------------------------------------------------------------------------+------------+-----------------+
|318    |Shawshank Redemption, The (1994)                                           |317         |4.429022082018927|
|858    |Godfather, The (1972)                                                      |192         |4.2890625        |
|2959   |Fight Club (1999)                                                          |218         |4.272935779816514|
|1276   |Cool Hand Luke (1967)                                                      |57          |4.271929824561403|
|750    |Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)|97          |4.268041237113402|
|904    |Rear Window (1954)                                     

In [5]:
'''
Creating multiple test runs for execution times for both Dataframe API and
Spark SQL
'''

def test_dataframe_api():
    start = timer()
    
    movie_stats = ratings.groupBy("movieId").agg(
        F.count("rating").alias("rating_count"),
        F.avg("rating").alias("avg_rating")
    )
    popular_movies = movie_stats.filter("rating_count >= 50")
    popular_movies = popular_movies.join(movies, on="movieId", how="inner")
    top_movies_df = popular_movies.orderBy("avg_rating", ascending=False)
    top_movies_df.count()

    end = timer()
    return end - start

def test_spark_sql():
    start = timer()
    
    query = """
    SELECT 
        r.movieId,
        m.title,
        COUNT(r.rating) AS rating_count,
        AVG(r.rating) AS avg_rating
    FROM ratings r
    JOIN movies m ON r.movieId = m.movieId
    GROUP BY r.movieId, m.title
    HAVING COUNT(r.rating) >= 50
    ORDER BY avg_rating DESC
    """
    top_movies_sql = spark.sql(query)
    top_movies_sql.count()  # force full computation

    end = timer()
    return end - start


In [6]:
num_runs = 5
results = {
    "Run": [],
    "DataFrame API (s)": [],
    "Spark SQL (s)": []
}

for i in range(1, num_runs + 1):
    df_time = test_dataframe_api()
    sql_time = test_spark_sql()
    
    results["Run"].append(i)
    results["DataFrame API (s)"].append(round(df_time, 3))
    results["Spark SQL (s)"].append(round(sql_time, 3))

timing_df = pd.DataFrame(results)
timing_df

Unnamed: 0,Run,DataFrame API (s),Spark SQL (s)
0,1,1.293,1.285
1,2,0.817,0.842
2,3,0.695,0.933
3,4,0.613,1.039
4,5,0.773,0.839
