In [5]:

import os
import sys

sys.path.append(os.path.abspath(os.path.join(os.path.dirname("__file__"), "../../")))

from config import settings
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim
import os
from src.config.spark_config import get_spark_session


from pyspark.sql import SparkSession
# Optional: custom tuning for this session
custom_tuning = {
    "spark.executor.memory": "6g",
    "spark.driver.memory": "4g",
    "spark.executor.cores": "4",
    "spark.sql.shuffle.partitions": "8",
    "spark.default.parallelism": "8",
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.autoBroadcastJoinThreshold": "-1"
}

# Step 1: Start Spark session
from src.config.spark_config import get_spark_session
spark = get_spark_session(app_name="Read_Parquet_Tuning", custom_config=custom_tuning)
# Step 2: Define Parquet folder path
parquet_base = os.path.join(settings.BASE_DIR, "data", "parquet_data")

# Step 3: Map your Parquet files
parquet_files = {
    "title_basics": os.path.join(parquet_base, "title_basics.parquet"),
    "title_crew": os.path.join(parquet_base, "title_crew.parquet"),
    "title_episode": os.path.join(parquet_base, "title_episode.parquet"),
    "title_akas": os.path.join(parquet_base, "title_akas.parquet")
}


25/06/22 23:53:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
  import pkg_resources


In [8]:
from pyspark.sql.functions import col

# Define which column to select for each file
columns_to_display = {
    "title_basics": "primaryTitle",
    "title_crew": "directors",
    "title_episode": "seasonNumber",
    "title_akas": "title"
}

# Load and display the specified column for each file
for name, path in parquet_files.items():
    print(f"📄 Reading file: {name}")
    column = columns_to_display[name]

    df = spark.read.parquet(path)
    
    if column in df.columns:
        df.select(col(column)).show(5)
    else:
        print(f"❌ Column '{column}' not found in {name}")

📄 Reading file: title_basics
+--------------------+
|        primaryTitle|
+--------------------+
|McCraw vs. Herrin...|
|          Ed Genesis|
|    Aikens vs. Brown|
|From Marshes to M...|
|     Orive vs. Stark|
+--------------------+
only showing top 5 rows
📄 Reading file: title_crew
+---------+
|directors|
+---------+
|nm0808310|
|       \N|
|       \N|
|nm0011612|
|nm0011612|
+---------+
only showing top 5 rows
📄 Reading file: title_episode
+------------+
|seasonNumber|
+------------+
|          \N|
|           1|
|           1|
|          \N|
|           2|
+------------+
only showing top 5 rows
📄 Reading file: title_akas
+--------------------+
|               title|
+--------------------+
|          Carmencita|
|          Carmencita|
|          Carmencita|
|Carmencita - span...|
|          Καρμενσίτα|
+--------------------+
only showing top 5 rows


In [9]:
from pyspark.sql.functions import col
#filter
path = parquet_files["title_basics"]
df = spark.read.parquet(path)

# ✅ Basic Query - Pushdown Filter Example
basic_filtered = df.filter(col("titleType") == "movie")
print("🔍 BASIC FILTER - Only movies")
basic_filtered.explain(mode="formatted")
basic_filtered.show(1, truncate=False)

🔍 BASIC FILTER - Only movies
== Physical Plan ==
* Filter (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [11]: [tconst#570, titleType#571, primaryTitle#572, originalTitle#573, isAdult#574, startYear#575, endYear#576, runtimeMinutes#577, genre_1#578, genre_2#579, ingested_at#580]
Batched: true
Location: InMemoryFileIndex [file:/Users/aryan/Desktop/project/data/parquet_data/title_basics.parquet]
PushedFilters: [IsNotNull(titleType), EqualTo(titleType,movie)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genre_1:string,genre_2:string,ingested_at:string>

(2) ColumnarToRow [codegen id : 1]
Input [11]: [tconst#570, titleType#571, primaryTitle#572, originalTitle#573, isAdult#574, startYear#575, endYear#576, runtimeMinutes#577, genre_1#578, genre_2#579, ingested_at#580]

(3) Filter [codegen id : 1]
Input [11]: [tconst#570, titleType#571, primaryTitle

In [11]:
#level 2
# ✅ Advanced Query - Combined Pushdown Filter
advanced_filtered = df.filter(
    (col("titleType") == "movie") &
    (col("startYear") == "2024") &
    (col("genre_1") == "Drama")
)

print("🔍 ADVANCED FILTER - Movies from 2024 in Drama")
advanced_filtered.explain(mode="formatted")
advanced_filtered.show(1, truncate=False)

🔍 ADVANCED FILTER - Movies from 2024 in Drama
== Physical Plan ==
* Filter (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [11]: [tconst#570, titleType#571, primaryTitle#572, originalTitle#573, isAdult#574, startYear#575, endYear#576, runtimeMinutes#577, genre_1#578, genre_2#579, ingested_at#580]
Batched: true
Location: InMemoryFileIndex [file:/Users/aryan/Desktop/project/data/parquet_data/title_basics.parquet]
PushedFilters: [IsNotNull(titleType), IsNotNull(startYear), IsNotNull(genre_1), EqualTo(titleType,movie), EqualTo(startYear,2024), EqualTo(genre_1,Drama)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genre_1:string,genre_2:string,ingested_at:string>

(2) ColumnarToRow [codegen id : 1]
Input [11]: [tconst#570, titleType#571, primaryTitle#572, originalTitle#573, isAdult#574, startYear#575, endYear#576, runtimeMinutes#577, genre_1#578, gen

In [12]:
df_no_pushdown = spark.read.parquet(path)
df_filtered = df_no_pushdown.filter(col("titleType") == "movie")

df_filtered.explain(mode="formatted")
df_filtered.show(1, truncate=False)

== Physical Plan ==
* Filter (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [11]: [tconst#650, titleType#651, primaryTitle#652, originalTitle#653, isAdult#654, startYear#655, endYear#656, runtimeMinutes#657, genre_1#658, genre_2#659, ingested_at#660]
Batched: true
Location: InMemoryFileIndex [file:/Users/aryan/Desktop/project/data/parquet_data/title_basics.parquet]
PushedFilters: [IsNotNull(titleType), EqualTo(titleType,movie)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genre_1:string,genre_2:string,ingested_at:string>

(2) ColumnarToRow [codegen id : 1]
Input [11]: [tconst#650, titleType#651, primaryTitle#652, originalTitle#653, isAdult#654, startYear#655, endYear#656, runtimeMinutes#657, genre_1#658, genre_2#659, ingested_at#660]

(3) Filter [codegen id : 1]
Input [11]: [tconst#650, titleType#651, primaryTitle#652, originalTitle#653, isAd

In [None]:
# # User Code (DataFrame API)

# Logical Plan (What to do)
# >>>Optimized Logical Plan (Simplified version)
#         ↓
# >>>Physical Plan (How to do it)
#         ↓
# >>> Executed by Spark Engine

In [13]:
from pyspark.sql.functions import col

# Read Parquet files
title_basics = spark.read.parquet(parquet_files["title_basics"])
title_crew = spark.read.parquet(parquet_files["title_crew"])

# Apply early filter to reduce data before join (pushdown)
filtered_basics = title_basics.filter(col("titleType") == "movie")

# Perform join
joined_df = filtered_basics.join(title_crew, on="tconst", how="inner")

# Select and display a few useful columns
result_df = joined_df.select("tconst", "primaryTitle", "directors", "writers")

# Show 5 records
result_df.show(5, truncate=False)

# Print physical plan
result_df.explain(mode="formatted")



+---------+--------------------------------+---------+---------+
|tconst   |primaryTitle                    |directors|writers  |
+---------+--------------------------------+---------+---------+
|tt0000591|The Prodigal Son                |nm0141150|nm0141150|
|tt0000867|Fiesta de toros                 |nm0023107|\N       |
|tt0000868|Fiestas de Santa Lucía - Belenes|nm0005717|nm0005717|
|tt0000886|Hamlet                          |nm0099901|nm0000636|
|tt0001007|La primera y segunda casetas    |nm0185426|\N       |
+---------+--------------------------------+---------+---------+
only showing top 5 rows
== Physical Plan ==
AdaptiveSparkPlan (12)
+- Project (11)
   +- SortMergeJoin Inner (10)
      :- Sort (5)
      :  +- Exchange (4)
      :     +- Project (3)
      :        +- Filter (2)
      :           +- Scan parquet  (1)
      +- Sort (9)
         +- Exchange (8)
            +- Filter (7)
               +- Scan parquet  (6)


(1) Scan parquet 
Output [3]: [tconst#696, titleType#697

                                                                                

In [None]:
# Filter early to reduce join data size.
# 	•	Avoid joins on huge unsorted datasets without bucketing/salting.
# 	•	Use broadcast() on smaller DF to trigger Broadcast Join if needed.


In [15]:
from pyspark.sql.functions import split, explode, trim

# Step 1: Load data
title_basics = spark.read.parquet(parquet_files["title_basics"])
title_crew = spark.read.parquet(parquet_files["title_crew"])
name_basics = spark.read.parquet(os.path.join(parquet_base, "name_basics.parquet"))

# Step 2: Filter only movies
movies = title_basics.filter(col("titleType") == "movie")

# Step 3: Join title_basics + title_crew
crew_joined = movies.join(title_crew, on="tconst", how="inner")

# Step 4: Explode director IDs (split on comma)
directors_df = crew_joined \
    .withColumn("director_id", explode(split(col("directors"), ","))) \
    .drop("writers")  # Optional

# Step 5: Join with name_basics to get actual names
final_df = directors_df.join(name_basics, directors_df.director_id == name_basics.nconst, how="left") \
    .select("tconst", "primaryTitle", "director_id", "primaryName") \
    .withColumnRenamed("primaryName", "director_name")

# Step 6: Show
final_df.show(5, truncate=False)

                                                                                

+---------+-----------------------------+-----------+-----------------------------------+
|tconst   |primaryTitle                 |director_id|director_name                      |
+---------+-----------------------------+-----------+-----------------------------------+
|tt0000675|Don Quijote                  |nm0194088  |Narciso Cuyàs                      |
|tt0000147|The Corbett-Fitzsimmons Fight|nm0714557  |Enoch J. Rector                    |
|tt0001422|Trail to the West            |nm0001908  |Gilbert M. 'Broncho Billy' Anderson|
|tt0000838|A Cultura do Cacau           |nm0017074  |Ernesto de Albuquerque             |
|tt0000850|Los dos hermanos             |nm0063413  |Ricardo de Baños                   |
+---------+-----------------------------+-----------+-----------------------------------+
only showing top 5 rows


In [16]:
df = title_basics.filter(col("titleType") == "movie")
df.explain(mode="formatted")

== Physical Plan ==
* Filter (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [11]: [tconst#741, titleType#742, primaryTitle#743, originalTitle#744, isAdult#745, startYear#746, endYear#747, runtimeMinutes#748, genre_1#749, genre_2#750, ingested_at#751]
Batched: true
Location: InMemoryFileIndex [file:/Users/aryan/Desktop/project/data/parquet_data/title_basics.parquet]
PushedFilters: [IsNotNull(titleType), EqualTo(titleType,movie)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string,originalTitle:string,isAdult:string,startYear:string,endYear:string,runtimeMinutes:string,genre_1:string,genre_2:string,ingested_at:string>

(2) ColumnarToRow [codegen id : 1]
Input [11]: [tconst#741, titleType#742, primaryTitle#743, originalTitle#744, isAdult#745, startYear#746, endYear#747, runtimeMinutes#748, genre_1#749, genre_2#750, ingested_at#751]

(3) Filter [codegen id : 1]
Input [11]: [tconst#741, titleType#742, primaryTitle#743, originalTitle#744, isAd

In [17]:
df = title_basics.filter(col("titleType") == "movie")

# Logical plan (developer-friendly)
print(df._jdf.queryExecution().logical())

# Optimized plan (after Catalyst optimizations)
print(df._jdf.queryExecution().optimizedPlan())

# Physical plan (actual execution)
print(df._jdf.queryExecution().executedPlan())

'Filter '`=`('titleType, movie)
+- Relation [tconst#741,titleType#742,primaryTitle#743,originalTitle#744,isAdult#745,startYear#746,endYear#747,runtimeMinutes#748,genre_1#749,genre_2#750,ingested_at#751] parquet

Filter (isnotnull(titleType#742) AND (titleType#742 = movie))
+- Relation [tconst#741,titleType#742,primaryTitle#743,originalTitle#744,isAdult#745,startYear#746,endYear#747,runtimeMinutes#748,genre_1#749,genre_2#750,ingested_at#751] parquet

*(1) Filter (isnotnull(titleType#742) AND (titleType#742 = movie))
+- *(1) ColumnarToRow
   +- FileScan parquet [tconst#741,titleType#742,primaryTitle#743,originalTitle#744,isAdult#745,startYear#746,endYear#747,runtimeMinutes#748,genre_1#749,genre_2#750,ingested_at#751] Batched: true, DataFilters: [isnotnull(titleType#742), (titleType#742 = movie)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/aryan/Desktop/project/data/parquet_data/title_basics.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(titleType), Equal

In [18]:
from pyspark.sql.functions import col
# Get All Movie Titles with Their Directors & Writers
title_basics = spark.read.parquet(parquet_files["title_basics"])
title_crew = spark.read.parquet(parquet_files["title_crew"])

In [19]:
movies_df = title_basics.filter(col("titleType") == "movie").select("tconst", "primaryTitle")

In [20]:
movie_crew_df = movies_df.join(
    title_crew,
    on="tconst",
    how="inner"
).select("tconst", "primaryTitle", "directors", "writers")

In [21]:
# EXPLAIN plan
movie_crew_df.explain(mode="formatted")

# Query Execution Details
print("\n🧠 Logical Plan:")
print(movie_crew_df._jdf.queryExecution().logical())

print("\n🛠 Optimized Plan:")
print(movie_crew_df._jdf.queryExecution().optimizedPlan())

print("\n⚙️ Physical Plan:")
print(movie_crew_df._jdf.queryExecution().executedPlan())

== Physical Plan ==
AdaptiveSparkPlan (12)
+- Project (11)
   +- SortMergeJoin Inner (10)
      :- Sort (5)
      :  +- Exchange (4)
      :     +- Project (3)
      :        +- Filter (2)
      :           +- Scan parquet  (1)
      +- Sort (9)
         +- Exchange (8)
            +- Filter (7)
               +- Scan parquet  (6)


(1) Scan parquet 
Output [3]: [tconst#790, titleType#791, primaryTitle#792]
Batched: true
Location: InMemoryFileIndex [file:/Users/aryan/Desktop/project/data/parquet_data/title_basics.parquet]
PushedFilters: [IsNotNull(titleType), EqualTo(titleType,movie), IsNotNull(tconst)]
ReadSchema: struct<tconst:string,titleType:string,primaryTitle:string>

(2) Filter
Input [3]: [tconst#790, titleType#791, primaryTitle#792]
Condition : ((isnotnull(titleType#791) AND (titleType#791 = movie)) AND isnotnull(tconst#790))

(3) Project
Output [2]: [tconst#790, primaryTitle#792]
Input [3]: [tconst#790, titleType#791, primaryTitle#792]

(4) Exchange
Input [2]: [tconst#790, pri

In [22]:
movie_crew_df.show(5, truncate=False)



+---------+--------------------------------+---------+---------+
|tconst   |primaryTitle                    |directors|writers  |
+---------+--------------------------------+---------+---------+
|tt0000591|The Prodigal Son                |nm0141150|nm0141150|
|tt0000867|Fiesta de toros                 |nm0023107|\N       |
|tt0000868|Fiestas de Santa Lucía - Belenes|nm0005717|nm0005717|
|tt0000886|Hamlet                          |nm0099901|nm0000636|
|tt0001007|La primera y segunda casetas    |nm0185426|\N       |
+---------+--------------------------------+---------+---------+
only showing top 5 rows


                                                                                

In [32]:
title_basics = spark.read.parquet(parquet_files["title_basics"])
title_crew = spark.read.parquet(parquet_files["title_crew"])
title_akas = spark.read.parquet(parquet_files["title_akas"])
name_basics = spark.read.parquet(os.path.join(parquet_base, "name_basics.parquet"))



In [None]:
title_ratings = spark.read.parquet(os.path.join(parquet_base, "title_ratings.parquet"))


In [33]:
# Filter High-Rated Movies (Predicate Pushdown)
high_rated = title_ratings.filter(col("averageRating") >= 8.5)

In [34]:
high_rated_movies = high_rated.join(
    title_basics.filter(col("titleType") == "movie"),
    on="tconst",
    how="inner"
)

In [35]:
movie_with_crew = high_rated_movies.join(title_crew, on="tconst", how="left")

In [36]:
from pyspark.sql.functions import split, explode

# Explode director IDs (comma-separated)
movie_with_directors = movie_with_crew.withColumn("director_id", explode(split(col("directors"), ",")))

# Join with name_basics
movie_with_director_names = movie_with_directors.join(
    name_basics.select(col("nconst").alias("director_id"), col("primaryName")),
    on="director_id",
    how="left"
)

In [37]:
final_df = movie_with_director_names.join(
    title_akas.select("titleId", "title", "region"),
    movie_with_director_names.tconst == title_akas.titleId,
    how="left"
).drop("titleId")

In [38]:
final_df.cache()

DataFrame[director_id: string, tconst: string, averageRating: double, numVotes: int, ingested_at: string, titleType: string, primaryTitle: string, originalTitle: string, isAdult: string, startYear: string, endYear: string, runtimeMinutes: string, genre_1: string, genre_2: string, ingested_at: string, directors: string, writers: string, ingested_at: string, primaryName: string, title: string, region: string]

In [None]:
# Use cache() when this DF is reused multiple times
# 	•	❌ Don’t cache unnecessarily (it eats memory)


In [39]:
# Final cleanup
output_df = final_df.select("primaryTitle", "primaryName", "averageRating", "region", "title")

# Optional: reduce partitions for small result set
output_df = output_df.coalesce(1)

# Save or display
output_df.show(10, truncate=False)



+-------------------------+-------------+-------------+------+-------------------------+
|primaryTitle             |primaryName  |averageRating|region|title                    |
+-------------------------+-------------+-------------+------+-------------------------+
|Powder                   |Arthur Maude |9.0          |\N    |Powder                   |
|Powder                   |Arthur Maude |9.0          |US    |Powder                   |
|The Monastery's Hunter   |Franz Osten  |8.5          |\N    |Der Klosterjäger         |
|The Monastery's Hunter   |Franz Osten  |8.5          |HU    |A hegyek pásztora        |
|The Monastery's Hunter   |Franz Osten  |8.5          |DE    |Der Klosterjäger         |
|The Monastery's Hunter   |Franz Osten  |8.5          |XWW   |The Monastery's Hunter   |
|The Rider of the King Log|Harry O. Hoyt|8.5          |\N    |The Rider of the King Log|
|The Rider of the King Log|Harry O. Hoyt|8.5          |GB    |The Rider of the King Log|
|The Rider of the Kin

                                                                                

In [40]:
output_df.explain(mode="formatted")

== Physical Plan ==
AdaptiveSparkPlan (81)
+- Coalesce (80)
   +- InMemoryTableScan (1)
         +- InMemoryRelation (2)
               +- AdaptiveSparkPlan (79)
                  +- == Final Plan ==
                     ResultQueryStage (49)
                     +- * Project (48)
                        +- * SortMergeJoin LeftOuter (47)
                           :- * Sort (40)
                           :  +- ShuffleQueryStage (39), Statistics(sizeInBytes=5.2 MiB, rowCount=1.32E+4)
                           :     +- Exchange (38)
                           :        +- * Project (37)
                           :           +- * SortMergeJoin LeftOuter (36)
                           :              :- * Sort (28)
                           :              :  +- ShuffleQueryStage (27), Statistics(sizeInBytes=4.9 MiB, rowCount=1.32E+4)
                           :              :     +- Exchange (26)
                           :              :        +- * Generate (25)
                    

In [43]:
# PySpark Complex Optimization Case Study using IMDB Dataset

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, count, avg, broadcast, pandas_udf, PandasUDFType
from pyspark.sql.types import StringType
import os

# ------------------------- Spark Session -------------------------
custom_config = {
    "spark.executor.memory": "6g",
    "spark.driver.memory": "4g",
    "spark.default.parallelism": "8",
    "spark.sql.shuffle.partitions": "8",
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.autoBroadcastJoinThreshold": "10485760",  # 10MB
    "spark.sql.execution.arrow.pyspark.enabled": "true"
}

spark = SparkSession.builder \
    .appName("IMDB_Advanced_Optimizations") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.executor.memory", custom_config["spark.executor.memory"]) \
    .config("spark.driver.memory", custom_config["spark.driver.memory"]) \
    .config("spark.default.parallelism", custom_config["spark.default.parallelism"]) \
    .config("spark.sql.shuffle.partitions", custom_config["spark.sql.shuffle.partitions"]) \
    .config("spark.sql.adaptive.enabled", custom_config["spark.sql.adaptive.enabled"]) \
    .config("spark.sql.autoBroadcastJoinThreshold", custom_config["spark.sql.autoBroadcastJoinThreshold"]) \
    .getOrCreate()

# ------------------------- Load Datasets -------------------------
base_path = "/Users/aryan/Desktop/project/data/parquet_data"

basics = spark.read.parquet(os.path.join(base_path, "title_basics.parquet"))
crew = spark.read.parquet(os.path.join(base_path, "title_crew.parquet"))
ratings = spark.read.parquet(os.path.join(base_path, "title_ratings.parquet"))
akas = spark.read.parquet(os.path.join(base_path, "title_akas.parquet"))
names = spark.read.parquet(os.path.join(base_path, "name_basics.parquet"))

# ------------------------- Optimization: Predicate Pushdown -------------------------
movies_df = basics.filter(col("titleType") == "movie")

# ------------------------- Optimization: Broadcast Join -------------------------
director_df = crew.withColumn("director_id", explode(split("directors", ",")))
name_df = names.select(col("nconst").alias("director_id"), "primaryName")

# Assume name_basics is small enough for broadcast
broadcast_name_df = broadcast(name_df)

joined_df = director_df.join(broadcast_name_df, on="director_id", how="left")

# ------------------------- Optimization: Multi Joins + Ratings -------------------------
movies_with_ratings = movies_df.join(ratings, on="tconst")

final_df = movies_with_ratings \
    .join(joined_df, on="tconst", how="left") \
    .join(akas.select("titleId", "title", "region"), movies_with_ratings.tconst == akas.titleId, how="left") \
    .drop("titleId")


In [46]:

# ------------------------- Optimization: Window Function + Partitioning -------------------------
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy("region").orderBy(col("averageRating").desc())
region_top_movies = final_df.withColumn("rank", row_number().over(windowSpec)).filter(col("rank") <= 3)

# ------------------------- Optimization: Pandas UDF -------------------------
@pandas_udf(StringType(), PandasUDFType.SCALAR)
def clean_region_code(region_series):
    return region_series.fillna("XX").str.upper()

region_top_movies = region_top_movies.withColumn("region", clean_region_code("region"))

# ------------------------- Shuffle Optimization: groupByKey vs reduceByKey -------------------------
# Simulated with rdd on genre_1
rdd = movies_df.select("genre_1", "tconst").rdd

# BAD: groupByKey (causes wide shuffle)
grouped_rdd = rdd.groupByKey().mapValues(lambda x: len(list(x)))

# GOOD: reduceByKey (combines locally before shuffle)
reduced_rdd = rdd.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x + y)

print("Genre counts using reduceByKey:")
print(reduced_rdd.take(10))

# ------------------------- Export or Display -------------------------
region_top_movies.select("primaryTitle", "primaryName", "averageRating", "region", "title", "rank").show(truncate=False)





Genre counts using reduceByKey:


                                                                                

[('Drama', 179774), ('Musical', 3198), ('Fantasy', 5006), ('Family', 5050), ('88', 66), ('Sport', 2160), ('86', 50), (' satsui to', 1), ('Game-Show', 20), ('113', 20)]


25/06/23 00:23:16 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:23:16 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:23:16 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:23:16 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:23:17 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:23:17 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:23:17 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:23:17 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:23:17 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:23:17 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:23:17 WA

KeyboardInterrupt: 

25/06/23 00:24:30 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:24:30 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:24:30 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:24:30 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:24:30 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:24:30 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:24:30 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:24:31 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:24:31 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:24:31 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/06/23 00:24:31 WA

In [2]:
import os
import sys

# Setup sys path to use config
sys.path.append(os.path.abspath(os.path.join(os.path.dirname("__file__"), "../../")))

from src.config import settings
from src.config.spark_config import get_spark_session
from pyspark.sql.functions import col, explode, split, count, desc
from pyspark.sql.types import StringType

# Optional: install pyarrow and pandas >= 2.0 if not already
# pip install pyarrow pandas --upgrade

# ----------------------
# Step 1: Tuning Config
# ----------------------
custom_tuning = {
    "spark.executor.memory": "8g",
    "spark.driver.memory": "6g",
    "spark.executor.cores": "4",
    "spark.sql.shuffle.partitions": "8",
    "spark.default.parallelism": "8",
    "spark.memory.fraction": "0.8",
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.autoBroadcastJoinThreshold": "-1",
    "spark.sql.execution.arrow.pyspark.enabled": "true"
}

spark = get_spark_session(app_name="IMDB_Optimized_ETL", custom_config=custom_tuning)

# ----------------------
# Step 2: Load Data
# ----------------------
parquet_base = os.path.join(settings.BASE_DIR, "data", "parquet_data")
files = {
    "basics": "title_basics.parquet",
    "akas": "title_akas.parquet",
    "crew": "title_crew.parquet",
    "name": "name_basics.parquet",
    "ratings": "title_ratings.parquet"
}
df_basics = spark.read.parquet(os.path.join(parquet_base, files["basics"]))
df_crew = spark.read.parquet(os.path.join(parquet_base, files["crew"]))
df_names = spark.read.parquet(os.path.join(parquet_base, files["name"]))
df_ratings = spark.read.parquet(os.path.join(parquet_base, files["ratings"]))

# Pushdown filter (predicate pushdown happens here)
df_movies = df_basics.filter((col("titleType") == "movie") & (col("isAdult") == "0"))

# ----------------------
# Step 3: Repartition + Persist
# ----------------------
df_movies = df_movies.repartition(8).persist()
df_ratings = df_ratings.repartition(4).persist()
df_crew = df_crew.repartition(4).persist()

# ----------------------
# Step 4: Join Ratings with Movies
# ----------------------
df_joined = df_movies.join(df_ratings, on="tconst", how="inner")

# ----------------------
# Step 5: Enrich with Crew (director/writer)
# ----------------------
df_joined = df_joined.join(df_crew, on="tconst", how="left")

# ----------------------
# Step 6: Enrich with Director Name
# ----------------------
df_names = df_names.select("nconst", "primaryName").withColumnRenamed("primaryName", "directorName")

df_joined = df_joined.withColumn("director_id", split(col("directors"), ",").getItem(0))
df_joined = df_joined.join(df_names, df_joined["director_id"] == df_names["nconst"], how="left")

# ----------------------
# Step 7: Final Output - Top Rated Movies with Directors
# ----------------------
result = df_joined.select(
    "primaryTitle", "startYear", "runtimeMinutes",
    "averageRating", "numVotes", "directorName"
).orderBy(desc("averageRating"))

result.show(10, truncate=False)

# ----------------------
# Step 8: Explain Plan
# ----------------------
print("⚙️ EXPLAIN PLAN:")
result.explain(mode="formatted")

# ----------------------
# Step 9: groupByKey vs reduceByKey demo
# ----------------------
from pyspark.sql import Row
from pyspark import SparkContext

sc = spark.sparkContext

sample_data = [
    ("Drama", 1), ("Action", 1), ("Drama", 1),
    ("Comedy", 1), ("Action", 1), ("Drama", 1)
]
rdd = sc.parallelize(sample_data)

print("✅ reduceByKey (recommended):")
rdd.reduceByKey(lambda a, b: a + b).collect()

print("⚠️ groupByKey (memory heavy):")
grouped = rdd.groupByKey().mapValues(lambda x: sum(x)).collect()




25/06/23 00:26:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
  import pkg_resources
                                                                                

+-------------------------+---------+--------------+-------------+--------+----------------------------+
|primaryTitle             |startYear|runtimeMinutes|averageRating|numVotes|directorName                |
+-------------------------+---------+--------------+-------------+--------+----------------------------+
|The New Guest            |2025     |\N            |10.0         |7       |Delinda Kay                 |
|QAnon Avenue             |2024     |57            |10.0         |7       |Danny McCarthy              |
|Kirik                    |2025     |128           |10.0         |104     |Nagathihalli Gangadhar Gowda|
|Guru Badul               |1988     |110           |10.0         |5       |A.R. Badul                  |
|Pandemiya                |2024     |\N            |10.0         |5       |Hilol Nasimov               |
|One Decision             |2022     |48            |10.0         |6       |George Kirvin               |
|Regal vs. Rebel          |2023     |\N            |10.

In [None]:
# # ----------------------
# # Step 10: Optional Pandas UDF (if available)
# # ----------------------
# try:
#     import pandas as pd
#     import pyarrow

#     from pyspark.sql.functions import pandas_udf
#     from pyspark.sql.types import DoubleType

#     @pandas_udf(DoubleType())
#     def convert_minutes_to_hours(runtime: pd.Series) -> pd.Series:
#         return runtime.fillna("0").astype(float) / 60.0

#     df_with_hours = result.withColumn("runtimeHours", convert_minutes_to_hours("runtimeMinutes"))
#     df_with_hours.select("primaryTitle", "runtimeMinutes", "runtimeHours").show(5)
# except ImportError:
#     print("❌ Pandas or PyArrow not installed. Skipping vectorized UDF.")


In [5]:
# This is a configuration-level concept, not code-specific
print("⚙️ Vertical Scaling → More memory/cores per executor (e.g., spark.executor.memory = '6g')")
print("⚙️ Horizontal Scaling → More executors/partitions (e.g., spark.default.parallelism = 8)")

⚙️ Vertical Scaling → More memory/cores per executor (e.g., spark.executor.memory = '6g')
⚙️ Horizontal Scaling → More executors/partitions (e.g., spark.default.parallelism = 8)


In [6]:
# Partition by startYear (simulates better filtering performance)
output_path = "/tmp/imdb_partitioned"
df_movies.write.partitionBy("startYear").mode("overwrite").parquet(output_path)

# Read with filter to show partition pruning
df_partitioned = spark.read.parquet(output_path)
df_2020 = df_partitioned.filter(col("startYear") == "2020")
df_2020.show(3)

                                                                                

+----------+---------+--------------------+--------------------+-------+-------+--------------+-------+-------+-------------------+---------+
|    tconst|titleType|        primaryTitle|       originalTitle|isAdult|endYear|runtimeMinutes|genre_1|genre_2|        ingested_at|startYear|
+----------+---------+--------------------+--------------------+-------+-------+--------------+-------+-------+-------------------+---------+
| tt4017332|    movie|             Markham|             Markham|      0|     \N|            71| Horror|   NULL|2025-06-22 23:11:21|     2020|
|tt33369160|    movie|Tube Light Web Se...|Tube Light Web Se...|      0|     \N|            \N| Comedy|   NULL|2025-06-22 23:11:21|     2020|
|tt34580926|    movie|Girl Looking for ...|Girl Looking for ...|      0|     \N|           118|  Crime|  Drama|2025-06-22 23:11:21|     2020|
+----------+---------+--------------------+--------------------+-------+-------+--------------+-------+-------+-------------------+---------+
only s

In [None]:
# Save table using bucketing (simulated here)
spark.sql("DROP TABLE IF EXISTS bucketed_movies")
df_movies.write.bucketBy(8, "genre_1").sortBy("primaryTitle").saveAsTable("bucketed_movies", format="parquet")

print(" Bucketed table 'bucketed_movies' created for faster joins")



✅ Bucketed table 'bucketed_movies' created for faster joins


                                                                                

In [9]:
# Broadcast join (fast if right table is small)
df_names_small = df_names.limit(10000)

df_broadcast = df_movies.join(broadcast(df_names_small), df_movies.tconst == df_names_small.nconst, "left")
print(" Broadcast Join Count:", df_broadcast.count())

# Shuffle join (default)
df_shuffle = df_movies.join(df_names, df_movies.tconst == df_names.nconst, "left")
print("Shuffle Join Count:", df_shuffle.count())

 Broadcast Join Count: 688717
Shuffle Join Count: 688717


In [10]:
from pyspark.sql.functions import when, rand, concat, lit

# Artificial skew
df_skewed = df_movies.withColumn("skew_key", when(col("startYear") == "2020", "high").otherwise("low"))

# Salting technique
df_skewed = df_skewed.withColumn("salt", (rand() * 10).cast("int"))
df_skewed = df_skewed.withColumn("skew_key_salted", concat(col("skew_key"), lit("_"), col("salt")))

df_skewed.groupBy("skew_key_salted").count().orderBy("count", ascending=False).show(5)

+---------------+-----+
|skew_key_salted|count|
+---------------+-----+
|          low_2|67448|
|          low_3|67399|
|          low_1|67397|
|          low_8|67346|
|          low_6|67316|
+---------------+-----+
only showing top 5 rows


In [11]:
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
df_check = df_movies.select("primaryTitle", "startYear").checkpoint()
print("✅ Checkpoint done. Safe lineage created.")
df_check.show(3)

✅ Checkpoint done. Safe lineage created.
+--------------------+---------+
|        primaryTitle|startYear|
+--------------------+---------+
|B-kyû video tsûsh...|     2000|
|           Heartbeat|       \N|
|    The Chess Spiral|       \N|
+--------------------+---------+
only showing top 3 rows


In [12]:
print("💡 Tip: Enable in production for straggler handling")
print("spark.conf.set('spark.speculation', 'true')")

💡 Tip: Enable in production for straggler handling
spark.conf.set('spark.speculation', 'true')
