In [1]:
from utils.utils import *
import numpy as np

In [2]:
df = spark.read.csv('../data/rating.csv', header=True)
movie = spark.read.csv('../data/movie.csv', header=True)

In [3]:
spark.sparkContext.uiWebUrl

'http://10.11.7.128:4040'

In [4]:
spark.conf.get('spark.app.name')

'lecture-6'

In [5]:
spark.conf.set('spark.sql.adaptive.enabled', 'false')

In [6]:
movie_br = F.broadcast(movie)

In [7]:
movie

DataFrame[movieId: string, title: string, genres: string]

In [8]:
df

DataFrame[userId: string, movieId: string, rating: string, timestamp: string]

In [9]:
df_full = df.join(movie.select('movieId', 'title'), on='movieId', how='outer')
# inner, outer, left, anti-left, cross|full

In [10]:
df_full.show(5,truncate=False)

+-------+------+------+-------------------+----------------------------+
|movieId|userId|rating|timestamp          |title                       |
+-------+------+------+-------------------+----------------------------+
|100010 |4347  |3     |2014-05-28 03:47:42|Battle of Los Angeles (2011)|
|100010 |5352  |3.5   |2013-06-09 03:35:24|Battle of Los Angeles (2011)|
|100010 |12131 |1.5   |2014-04-24 19:51:00|Battle of Los Angeles (2011)|
|100010 |16693 |3.5   |2013-03-05 05:12:30|Battle of Los Angeles (2011)|
|100010 |20180 |1     |2015-02-18 21:24:43|Battle of Los Angeles (2011)|
+-------+------+------+-------------------+----------------------------+
only showing top 5 rows


In [11]:
spark.conf.set('spark.sql.adaptive.enabled', 'false')
df_full.explain()

== Physical Plan ==
*(3) Project [coalesce(movieId#18, movieId#38) AS movieId#42, userId#17, rating#19, timestamp#20, title#39]
+- *(3) SortMergeJoin [movieId#18], [movieId#38], FullOuter
   :- *(1) Sort [movieId#18 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(movieId#18, 200), ENSURE_REQUIREMENTS, [plan_id=98]
   :     +- FileScan csv [userId#17,movieId#18,rating#19,timestamp#20] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/User/Desktop/DSB_2024-2026/2.1/5._Big_Data/Project/data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<userId:string,movieId:string,rating:string,timestamp:string>
   +- *(2) Sort [movieId#38 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(movieId#38, 200), ENSURE_REQUIREMENTS, [plan_id=99]
         +- FileScan csv [movieId#38,title#39] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/User/Desktop/DSB_2024-2026/2.1/5.

In [12]:
df.rdd.getNumPartitions()

22

In [13]:
movie.rdd.getNumPartitions()

1

In [14]:
df_full_br = df.join(
    movie_br,
    on='movieId',
    how='inner'
)
df_full_br.explain()

== Physical Plan ==
*(2) Project [movieId#18, userId#17, rating#19, timestamp#20, title#39, genres#40]
+- *(2) BroadcastHashJoin [movieId#18], [movieId#38], Inner, BuildRight, false
   :- *(2) Filter isnotnull(movieId#18)
   :  +- FileScan csv [userId#17,movieId#18,rating#19,timestamp#20] Batched: false, DataFilters: [isnotnull(movieId#18)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/User/Desktop/DSB_2024-2026/2.1/5._Big_Data/Project/data..., PartitionFilters: [], PushedFilters: [IsNotNull(movieId)], ReadSchema: struct<userId:string,movieId:string,rating:string,timestamp:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [plan_id=160]
      +- *(1) Filter isnotnull(movieId#38)
         +- FileScan csv [movieId#38,title#39,genres#40] Batched: false, DataFilters: [isnotnull(movieId#38)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/User/Desktop/DSB_2024-2026/2.1/5._Big_Data/Project/data..., Partition

In [None]:
# Task
# - Join movies and ratings tables using broadcast hash join, shuffle hash join and sort merge join with movies with 1,4,16 partitions and ratings with 1, 4, 16, 64 partitions. For the joined table compute the average rating per movie. You should get the same result every time. We do this task to benchmark 3*3*4 24 different options for join. Visualize with a 3 way table (join strategies as rows, 3*4 column, or vice versa) to see what works faster
# - Self join the table by userid and movieid, and find number us users where they rate a movie 30 days after rating a movie. Compare SHJ and SMJ for this analysis.
#
# For both cases also report the the params for your spark pseudo-cluster - n of executors, memory, driver memory.

In [25]:
from pyspark.sql.types import IntegerType
import time
movie_partitions = [1, 4, 16]
ratings_partitions = [1, 4, 16, 64]
join_strategies = {
    "BHJ": "BROADCAST",
    "SHJ": "SHUFFLE_HASH",
    "SMJ": "SORT_MERGE"
}
results = []

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [27]:
def run_benchmark(movie_df, ratings_df, join_strategy_hint, mp, rp):
    ratings_repartitioned = ratings_df.repartition(rp, "movieid")
    movies_repartitioned = movie_df.repartition(mp, "movieid")

    if join_strategy_hint == "BROADCAST":
        joined_df = ratings_repartitioned.join(
            F.broadcast(movies_repartitioned),
            on="movieid",
            how="inner"
        )
    else:
        joined_df = ratings_repartitioned.join(
            movies_repartitioned.hint(join_strategy_hint),
            on="movieid",
            how="inner"
        )
    avg_ratings = joined_df.groupBy("title").agg(F.avg("rating").alias("avg_rating"))
    start_time = time.time()
    avg_ratings.count()
    end_time = time.time()

    elapsed_time = end_time - start_time
    return elapsed_time

In [28]:
import pandas as pd

print(" Benchmark Start ")
for strategy_name, hint in join_strategies.items():
    for mp in movie_partitions:
        for rp in ratings_partitions:
            time_taken = run_benchmark(movie, ratings, hint, mp, rp)

            results.append({
                "Strategy": strategy_name,
                "Movies_Partitions": mp,
                "Ratings_Partitions": rp,
                "Time_Sec": time_taken
            })
            print(f"Finished {strategy_name} (Movies P:{mp}, Ratings P:{rp}): {time_taken:.2f} sec")

results_df = pd.DataFrame(results)
pivot_table = results_df.pivot_table(
    index='Strategy',
    columns=['Movies_Partitions', 'Ratings_Partitions'],
    values='Time_Sec'
)

print("\n Benchmark Results ")
print(pivot_table.to_markdown())

 Benchmark Start 
Finished BHJ (Movies P:1, Ratings P:1): 9.86 sec
Finished BHJ (Movies P:1, Ratings P:4): 5.68 sec
Finished BHJ (Movies P:1, Ratings P:16): 6.30 sec
Finished BHJ (Movies P:1, Ratings P:64): 11.12 sec
Finished BHJ (Movies P:4, Ratings P:1): 18.27 sec
Finished BHJ (Movies P:4, Ratings P:4): 12.63 sec
Finished BHJ (Movies P:4, Ratings P:16): 9.28 sec
Finished BHJ (Movies P:4, Ratings P:64): 24.25 sec
Finished BHJ (Movies P:16, Ratings P:1): 9.90 sec
Finished BHJ (Movies P:16, Ratings P:4): 6.69 sec
Finished BHJ (Movies P:16, Ratings P:16): 10.52 sec
Finished BHJ (Movies P:16, Ratings P:64): 16.14 sec
Finished SHJ (Movies P:1, Ratings P:1): 24.32 sec
Finished SHJ (Movies P:1, Ratings P:4): 23.78 sec
Finished SHJ (Movies P:1, Ratings P:16): 25.27 sec
Finished SHJ (Movies P:1, Ratings P:64): 30.58 sec
Finished SHJ (Movies P:4, Ratings P:1): 26.48 sec
Finished SHJ (Movies P:4, Ratings P:4): 20.47 sec
Finished SHJ (Movies P:4, Ratings P:16): 21.09 sec
Finished SHJ (Movies P:4,