In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, round, count, avg, min, max, stddev, expr, upper

# Initialize Spark Session
spark = SparkSession.builder.appName('AnimeRatingsPipeline').getOrCreate()

## Load Data

In [0]:
df = spark.table("workspace.default.users_score_2023")
df.printSchema()
df.show(5)

## Apply Transformations
### *1. 2+filter operations*

In [0]:
# Filter 1: Keep ratings >= 5
df_filtered = df.filter(expr("try_cast(rating as double) >= 5"))

# Filter 2: Exclude users with fewer than 10 ratings
user_counts = df_filtered.groupBy('user_id').count()
active_users = user_counts.filter(col('count') >= 10)

# Join back to keep only active users
df_filtered = df_filtered.join(active_users, on='user_id', how='inner')
df_filtered.show(5)

### *2. 1+complex aggregation*

In [0]:
#multiple summary statistics per anime
complex_agg_df = df_filtered.groupBy("anime_id", "Anime Title").agg(
    avg("rating").alias("avg_rating"),
    expr("percentile_approx(rating, 0.5)").alias("median_rating"),
    min("rating").alias("min_rating"),
    max("rating").alias("max_rating"),
    stddev("rating").alias("stddev_rating"),
    count("*").alias("num_ratings")
)

complex_agg_df.show(10)

### *3. 1+groupBy with aggredations*

In [0]:
# Compute average rating and number of ratings per anime
agg_df = df_filtered.groupBy('anime_id', 'Anime Title').agg(
    avg('rating').alias('avg_rating'),
    count('*').alias('num_ratings')
)

agg_df.show(10)

### *4. Column transformation*

In [0]:
# Round average ratings and create popularity metric
agg_df = agg_df.withColumn('avg_rating', round(col('avg_rating'), 2))
agg_df = agg_df.withColumn('popularity', col('avg_rating') * col('num_ratings'))

agg_df.show(10)

## 2+SQL queries

In [0]:
# Register temporary view
agg_df.createOrReplaceTempView('anime_stats')

# Query 1: Top 10 anime by average rating (with min ratings > 500)
top_avg = spark.sql('''
SELECT `Anime Title`, avg_rating, num_ratings
FROM anime_stats
WHERE num_ratings > 500
ORDER BY avg_rating DESC
LIMIT 10
''')
top_avg.show()

# Query 2: Top 10 anime by popularity
top_pop = spark.sql('''
SELECT `Anime Title`, popularity, num_ratings
FROM anime_stats
ORDER BY popularity DESC
LIMIT 10
''')
top_pop.show()

## Optimatizon of queries

In [0]:

# Create temp view filtered_anime
spark.sql("""
CREATE OR REPLACE TEMP VIEW filtered_anime AS
SELECT *
FROM anime_stats
WHERE num_ratings > 500
""")

# Repartition and register another temp view for better parallelism
filtered_df = spark.table("filtered_anime").repartition(8, "`Anime Title`")
filtered_df.createOrReplaceTempView("filtered_anime_partitioned")

# Optimized Query 1: Top 10 anime by average rating
top_avg_opt = spark.sql("""
SELECT `Anime Title`, avg_rating, num_ratings
FROM filtered_anime_partitioned
ORDER BY avg_rating DESC
LIMIT 10
""")
print("‚úÖ Optimized Query 1: Top 10 Anime by Average Rating")
top_avg_opt.show()

# Optimized Query 2: Top 10 anime by popularity
top_pop_opt = spark.sql("""
SELECT `Anime Title`, popularity, num_ratings
FROM filtered_anime_partitioned
ORDER BY popularity DESC
LIMIT 10
""")
print("‚úÖ Optimized Query 2: Top 10 Anime by Popularity")
top_pop_opt.show()


# Rename invalid columns (no spaces or punctuation)
def clean_columns(df):
    safe_cols = [c.replace(' ', '_').replace('-', '_') for c in df.columns]
    return df.toDF(*safe_cols)

top_avg_clean = clean_columns(top_avg_opt)
top_pop_clean = clean_columns(top_pop_opt)

# Save as tables
top_avg_clean.write.mode("overwrite").saveAsTable("workspace.default.top_avg_anime")
top_pop_clean.write.mode("overwrite").saveAsTable("workspace.default.top_pop_anime")




## Perform analysis

In [0]:
print("Plan for top_avg_opt:")
top_avg_opt.explain(mode="formatted")

print("Plan for top_pop_opt:")
top_pop_opt.explain(mode="formatted")

display(top_avg_opt)
display(top_pop_opt)

### üîç Performance Analysis Summary

**Spark Optimizations**

Both optimized queries were executed entirely within **Photon**, Databricks‚Äô vectorized execution engine, as shown by the `PhotonResultStage` in the physical plan.  
The engine applied **Adaptive Query Execution (AQE)**, dynamically optimizing shuffle partitions and join strategies during runtime.  
The query also benefited from **column pruning**‚Äîonly the required columns (`Anime Title`, `avg_rating`, `num_ratings`, `popularity`) were read.

**Filter Pushdown**

The filter `num_ratings > 500` was pushed down early through the subquery `filtered_anime`.  
This reduced the input size before sorting and limiting, minimizing shuffle and scan costs.  
You can see this in the plan: the filter stage appears *before* the sort in the physical DAG.

**Performance Bottlenecks and Fixes**

The main bottleneck lies in the initial wide aggregation that computes `anime_stats`.  
This stage requires shuffles across partitions by `anime_id`.  
We mitigated it by (1) filtering early, (2) repartitioning with `repartition(8, "Anime Title")` to reduce shuffle width, and (3) caching `filtered_anime_partitioned` for reuse.  
Overall, the pipeline achieved full Photon acceleration with efficient in-memory reuse.


## Actions vs. Transformation

In [0]:

# Transformations (lazy) ‚Äî build the DAG but don't execute
t1 = df.select("user_id", "anime_id", "rating")              # transformation
t2 = t1.filter(t1["rating"] >= 8)                            # transformation
t3 = t2.withColumn("user_id_upper", upper(t2["user_id"].cast("string")))  # transformation

# Actions (eager) ‚Äî trigger computation
print("Showing first 5 rows:")
t3.show(5)          # action

print("Counting rows:")
t3.count()          # action


### ‚öôÔ∏è Actions vs Transformations Explanation

Transformations such as `select`, `filter`, and `withColumn` are *lazy*‚Äîthey do not execute immediately.  
They build a logical plan that Spark optimizes before running.  

Actions such as `show()` and `count()` are *eager*‚Äîthey trigger the execution of the full DAG and return results or materialized data.  

This example demonstrates Spark‚Äôs lazy evaluation model, which allows it to combine and reorder transformations for optimal performance before performing any computation.
