In [0]:
from pyspark.sql.functions import col, count, avg, sum as spark_sum, desc, when

# Verify Spark is ready (spark is pre-created in Databricks)
print(f"Spark version: {spark.version}")
print(f"Ready to go!")

Spark version: 4.0.0
Ready to go!


In [0]:
# Verify that file exists
display(dbutils.fs.ls("/Volumes/workspace/amazon_project/raw_data"))

path,name,size,modificationTime
dbfs:/Volumes/workspace/amazon_project/raw_data/Electronics.json.gz,Electronics.json.gz,3280683083,1762804017000


In [0]:
from pyspark.sql import functions as F, types as T
from pyspark.sql.functions import col, avg, count, from_unixtime

reviews_path = "dbfs:/Volumes/workspace/amazon_project/raw_data/Electronics.json.gz"

schema = T.StructType([
    T.StructField("reviewerID", T.StringType()),
    T.StructField("asin", T.StringType()),
    T.StructField("overall", T.DoubleType()),
    T.StructField("reviewText", T.StringType()),
    T.StructField("summary", T.StringType()),
    T.StructField("unixReviewTime", T.LongType()),
    T.StructField("verified", T.BooleanType()),
    T.StructField("vote", T.StringType())
])

reviews_raw = spark.read.schema(schema).json(reviews_path)

### Step: Demonstrating Lazy Evaluation and Catalyst Optimization

I will first apply transformations (filters, withColumn, and aggregation) in three separate cells to show that Spark builds a **logical plan** for each step but **does not execute anything** until an action (like `show()`, `count()`, or `write()`) is called.

Later, I will chain them all together to show that Spark’s Catalyst optimizer combines these steps into a single optimized execution plan (predicate pushdown, column pruning, and stage fusion).

In [0]:
reviews_filtered = (
    reviews_raw
      .filter((col("verified") == True))
      .filter((col("overall") >= 1) & (col("overall") <= 5))
      .filter(F.length(F.trim(F.col("reviewText"))) > 0)
)

# No action yet — Spark hasn't executed
reviews_filtered.explain()  # Logical plan only

== Physical Plan ==
*(1) ColumnarToRow
+- PhotonResultStage
   +- PhotonFilter (((((verified#12658 AND isnotnull(verified#12658)) AND isnotnull(overall#12654)) AND (overall#12654 >= 1.0)) AND (overall#12654 <= 5.0)) AND (length(trim(reviewText#12655, None)) > 0))
      +- PhotonJsonScan json [reviewerID#12652,asin#12653,overall#12654,reviewText#12655,summary#12656,unixReviewTime#12657L,verified#12658,vote#12659] Batched: true, DataFilters: [verified#12658, isnotnull(verified#12658), isnotnull(overall#12654), (overall#12654 >= 1.0), (ov..., Format: JSON, Location: InMemoryFileIndex(1 paths)[dbfs:/Volumes/workspace/amazon_project/raw_data/Electronics.json.gz], PartitionFilters: [], PushedFilters: [EqualTo(verified,true), IsNotNull(verified), IsNotNull(overall), GreaterThanOrEqual(overall,1.0)..., ReadSchema: struct<reviewerID:string,asin:string,overall:double,reviewText:string,summary:string,unixReviewTi...


== Photon Explanation ==
The query is fully supported by Photon.


In [0]:
reviews_transformed = (
    reviews_filtered
      .withColumn("vote_clean", F.regexp_replace("vote", ",", ""))
      .withColumn("vote_int", F.when(F.col("vote_clean").rlike("^[0-9]+$"), F.col("vote_clean").cast("int")).otherwise(0))
      .withColumn("review_len", F.length("reviewText"))
      .withColumn("review_ts", F.from_unixtime("unixReviewTime").cast("timestamp"))
      .withColumn("review_year", F.year("review_ts"))
      .withColumn("review_month", F.date_format("review_ts", "yyyy-MM"))
)
reviews_transformed.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- PhotonResultStage
   +- PhotonProject [reviewerID#12652, asin#12653, overall#12654, reviewText#12655, summary#12656, unixReviewTime#12657L, verified#12658, vote#12659, vote_clean#12664, vote_int#12666, review_len#12668, review_ts#12670, year(cast(review_ts#12670 as date)) AS review_year#12672, date_format(review_ts#12670, yyyy-MM, Some(Etc/UTC)) AS review_month#12674]
      +- PhotonProject [reviewerID#12652, asin#12653, overall#12654, reviewText#12655, summary#12656, unixReviewTime#12657L, verified#12658, vote#12659, vote_clean#12664, CASE WHEN RLIKE(vote_clean#12664, ^[0-9]+$) THEN cast(vote_clean#12664 as int) ELSE 0 END AS vote_int#12666, length(reviewText#12655) AS review_len#12668, cast(from_unixtime(unixReviewTime#12657L, yyyy-MM-dd HH:mm:ss, Some(Etc/UTC)) as timestamp) AS review_ts#12670]
         +- PhotonProject [reviewerID#12652, asin#12653, overall#12654, reviewText#12655, summary#12656, unixReviewTime#12657L, verified#12658, vote#

In [0]:
reviews_transformed = (
    reviews_filtered
      .withColumn("vote_clean", F.regexp_replace("vote", ",", ""))
      .withColumn("vote_int", F.when(F.col("vote_clean").rlike("^[0-9]+$"), F.col("vote_clean").cast("int")).otherwise(0))
      .withColumn("review_len", F.length("reviewText"))
      .withColumn("review_ts", F.from_unixtime("unixReviewTime").cast("timestamp"))
      .withColumn("review_year", F.year("review_ts"))
      .withColumn("review_month", F.date_format("review_ts", "yyyy-MM"))
)
reviews_transformed.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- PhotonResultStage
   +- PhotonProject [reviewerID#12652, asin#12653, overall#12654, reviewText#12655, summary#12656, unixReviewTime#12657L, verified#12658, vote#12659, vote_clean#12678, vote_int#12680, review_len#12682, review_ts#12684, year(cast(review_ts#12684 as date)) AS review_year#12686, date_format(review_ts#12684, yyyy-MM, Some(Etc/UTC)) AS review_month#12688]
      +- PhotonProject [reviewerID#12652, asin#12653, overall#12654, reviewText#12655, summary#12656, unixReviewTime#12657L, verified#12658, vote#12659, vote_clean#12678, CASE WHEN RLIKE(vote_clean#12678, ^[0-9]+$) THEN cast(vote_clean#12678 as int) ELSE 0 END AS vote_int#12680, length(reviewText#12655) AS review_len#12682, cast(from_unixtime(unixReviewTime#12657L, yyyy-MM-dd HH:mm:ss, Some(Etc/UTC)) as timestamp) AS review_ts#12684]
         +- PhotonProject [reviewerID#12652, asin#12653, overall#12654, reviewText#12655, summary#12656, unixReviewTime#12657L, verified#12658, vote#

In [0]:
reviews_agg = (
    reviews_transformed
      .groupBy("review_year")
      .agg(
          F.count("*").alias("n_reviews"),
          F.avg("overall").alias("avg_rating"),
          F.avg("review_len").alias("avg_length"),
          F.sum("vote_int").alias("total_votes")
      )
)
reviews_agg.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonGroupingAgg(keys=[review_year#12686], functions=[finalmerge_count(merge count#12700L) AS count(1)#12695L, finalmerge_avg(merge sum#12703, count#12704L) AS avg(overall)#12696, finalmerge_avg(merge sum#12707, count#12708L) AS avg(review_len)#12697, finalmerge_sum(merge sum#12710L) AS sum(vote_int)#12698L])
         +- PhotonShuffleExchangeSource
            +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#11799]
               +- PhotonShuffleExchangeSink hashpartitioning(review_year#12686, 1024)
                  +- PhotonGroupingAgg(keys=[review_year#12686], functions=[partial_count(1) AS count#12700L, partial_avg(overall#12654) AS (sum#12703, count#12704L), partial_avg(review_len#12682) AS (sum#12707, count#12708L), partial_sum(vote_int#12680) AS sum#12710L])
                     +- PhotonProject [overall#12654, CASE WHEN RLIKE(vote_clean#12678, ^[0-9

In [0]:
# This triggers the actual Spark job
display(reviews_agg)

review_year,n_reviews,avg_rating,avg_length,total_votes
2018,1414803,4.065596411655898,167.2316944479196,159073
2006,22949,4.055775850799599,687.4453353087281,202200
2000,666,4.105105105105105,771.0360360360361,11596
2007,73618,4.191556412833818,564.9701024206037,391605
2013,1738860,4.121423231312469,333.85820020013114,1767327
2005,11396,3.922867672867673,779.845121095121,138701
2009,194218,4.125446663028144,587.9962155927875,736769
2008,117056,4.191694573537452,611.1936423592127,569917
2015,3922624,4.13401207966912,185.7321027964954,2276337
2003,3286,3.9269628727936703,774.6877662811929,33579


In [0]:
reviews_pipeline = (
    spark.read.schema(schema).json(reviews_path)
      .filter(F.col("verified") == True)
      .filter((F.col("overall") >= 1) & (F.col("overall") <= 5))
      .withColumn("vote_clean", F.regexp_replace("vote", ",", ""))
      .withColumn("vote_int", F.when(F.col("vote_clean").rlike("^[0-9]+$"), F.col("vote_clean").cast("int")).otherwise(0))
      .withColumn("review_len", F.length("reviewText"))
      .withColumn("review_year", F.year(F.from_unixtime("unixReviewTime")))
      .groupBy("review_year")
      .agg(F.count("*").alias("n_reviews"), F.avg("overall").alias("avg_rating"))
)
reviews_pipeline.explain("formatted")

== Physical Plan ==
AdaptiveSparkPlan (11)
+- == Initial Plan ==
   ColumnarToRow (10)
   +- PhotonResultStage (9)
      +- PhotonGroupingAgg (8)
         +- PhotonShuffleExchangeSource (7)
            +- PhotonShuffleMapStage (6)
               +- PhotonShuffleExchangeSink (5)
                  +- PhotonGroupingAgg (4)
                     +- PhotonProject (3)
                        +- PhotonFilter (2)
                           +- PhotonJsonScan json  (1)


(1) PhotonJsonScan json 
Output [3]: [overall#12806, unixReviewTime#12809L, verified#12810]
Batched: true
Location: InMemoryFileIndex [dbfs:/Volumes/workspace/amazon_project/raw_data/Electronics.json.gz]
PushedFilters: [EqualTo(verified,true), IsNotNull(verified), IsNotNull(overall), GreaterThanOrEqual(overall,1.0), LessThanOrEqual(overall,5.0)]
ReadSchema: struct<overall:double,unixReviewTime:bigint,verified:boolean>

(2) PhotonFilter
Input [3]: [overall#12806, unixReviewTime#12809L, verified#12810]
Arguments: ((((verified#12810

## SQL

In [0]:
# Register the DataFrame as a SQL view for Spark SQL
reviews_raw.createOrReplaceTempView("reviews")

#### SQL Query 1 – Average rating by year

In [0]:
%sql
-- SQL Query 1: Yearly average rating
SELECT 
  YEAR(FROM_UNIXTIME(unixReviewTime)) AS review_year,
  COUNT(*) AS n_reviews,
  ROUND(AVG(overall), 2) AS avg_rating
FROM reviews
WHERE verified = true
GROUP BY review_year
ORDER BY review_year;

review_year,n_reviews,avg_rating
1999,58,4.12
2000,666,4.11
2001,1664,4.0
2002,2231,3.98
2003,3286,3.93
2004,4403,3.8
2005,11396,3.92
2006,22949,4.06
2007,73618,4.19
2008,117056,4.19


#### SQL Query 2 – Top products (ASINs) with ≥1000 reviews

In [0]:
%sql
-- SQL Query 2: Top ASINs by number of reviews and average rating
SELECT 
  asin,
  COUNT(*) AS n_reviews,
  ROUND(AVG(overall), 2) AS avg_rating
FROM reviews
WHERE verified = true
GROUP BY asin
HAVING COUNT(*) >= 1000
ORDER BY avg_rating DESC
LIMIT 20;

asin,n_reviews,avg_rating
B01DBV1OKY,2651,4.87
B00RJBWA9C,2255,4.87
B01BV2KXYI,1385,4.85
B001AQYJI2,1175,4.84
B004Y1AYAC,1626,4.84
B00KPRWAX8,1260,4.82
B005LS2HM0,1325,4.82
B0029N3U8K,1162,4.82
B00DI89YAI,2376,4.82
B0043WJRRS,10345,4.81


In [0]:
external_path = "s3://databricks-s3-ingest-95ac0-lambdazipsbucket-jniyxzmi4zy0/amazon_project/output"

(
    spark_agg
      .repartition(32)
      .write
      .mode("overwrite")
      .parquet(external_path)
)

display(dbutils.fs.ls(external_path))

[0;31m---------------------------------------------------------------------------[0m
[0;31mUnknownException[0m                          Traceback (most recent call last)
File [0;32m<command-4551495771618192>, line 8[0m
[1;32m      1[0m external_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124ms3://databricks-s3-ingest-95ac0-lambdazipsbucket-jniyxzmi4zy0/[39m[38;5;124m"[39m
[1;32m      3[0m (
[1;32m      4[0m     spark_agg
[1;32m      5[0m       [38;5;241m.[39mrepartition([38;5;241m32[39m)
[1;32m      6[0m       [38;5;241m.[39mwrite
[1;32m      7[0m       [38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)
[0;32m----> 8[0m       [38;5;241m.[39mparquet(external_path)
[1;32m      9[0m )
[1;32m     11[0m display(dbutils[38;5;241m.[39mfs[38;5;241m.[39mls(external_path))

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/readwriter.py:779[0m, in [0;36mDataFrameWriter.parquet[0;34m(self, p

#### Caching optimization

In [0]:
# Materialize the DataFrame as a temporary view
reviews_spark_filter.createOrReplaceTempView("reviews_spark_filter_temp")

# Use SQL to count rows (will be faster after materialization)
t0 = time.time()
spark.sql("SELECT COUNT(*) FROM reviews_spark_filter_temp")
t1 = time.time()

# Repeat to demonstrate improved performance
t2 = time.time()
spark.sql("SELECT COUNT(*) FROM reviews_spark_filter_temp")
t3 = time.time()

print(f"First run: {t1 - t0:.2f}s")
print(f"Second run: {t3 - t2:.2f}s (should be faster)")

First run: 0.06s
Second run: 0.06s (should be faster)


### Actions vs Transformations

#### Lazy Transformation

In [0]:
from pyspark.sql import functions as F

# Start with a small example DataFrame
demo_df = spark.range(0, 10)  # creates a DataFrame with a single column 'id' = [0,1,2,...,9]

# --- Transformations (Lazy) ---
transformed_df = (
    demo_df
      .filter(F.col("id") % 2 == 0)       # Keep even numbers
      .withColumn("squared", F.col("id") ** 2)  # Compute a new column
      .withColumn("category", F.when(F.col("squared") > 20, "high").otherwise("low"))
)

# Print the logical/physical plan, but note: no job has run yet!
transformed_df.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- PhotonResultStage
   +- PhotonProject [id#13296L, squared#13298, CASE WHEN (squared#13298 > 20.0) THEN high ELSE low END AS category#13300]
      +- PhotonProject [id#13296L, POWER(cast(id#13296L as double), 2.0) AS squared#13298]
         +- PhotonFilter ((id#13296L % 2) = 0)
            +- PhotonRange Range (0, 10, step=1, splits=8)


== Photon Explanation ==
The query is fully supported by Photon.


#### Action 

In [0]:
# --- Actions (Eager) ---
# These trigger Spark to actually compute results.
print("Showing transformed results:")
transformed_df.show()

# Another action example: count
n_rows = transformed_df.count()
print(f"Number of rows in transformed_df: {n_rows}")

Showing transformed results:
+---+-------+--------+
| id|squared|category|
+---+-------+--------+
|  0|    0.0|     low|
|  2|    4.0|     low|
|  4|   16.0|     low|
|  6|   36.0|    high|
|  8|   64.0|    high|
+---+-------+--------+

Number of rows in transformed_df: 5


### Machine Learning

In [0]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

ml_df = (
    reviews_raw
      .filter(F.col("verified") == True)
      .withColumn("label", F.when(F.col("overall") >= 4, 1).otherwise(0))  # target: positive vs negative
      # clean commas and safely cast vote to int
      .withColumn("vote_clean", F.regexp_replace("vote", ",", ""))          # remove commas from strings like "1,226"
      .withColumn(
          "vote_int",
          F.when(F.col("vote_clean").rlike("^[0-9]+$"), F.col("vote_clean").cast("int"))  # only cast if numeric
           .otherwise(F.lit(0))
      )
      .withColumn("review_len", F.length("reviewText"))
      .select("label", "vote_int", "review_len")
      .na.drop()
)

display(ml_df.limit(5))
ml_df.printSchema()

label,vote_int,review_len
1,0,712
1,0,2657
1,2,522
0,4,239
1,0,495


root
 |-- label: integer (nullable = false)
 |-- vote_int: integer (nullable = true)
 |-- review_len: integer (nullable = true)



In [0]:
assembler = VectorAssembler(
    inputCols=["vote_int", "review_len"],
    outputCol="features"
)

# Split into train/test sets
train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)

In [0]:
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
pipeline = Pipeline(stages=[assembler, lr])

model = pipeline.fit(train_df)

In [0]:
predictions = model.transform(test_df)
display(predictions.select("features", "label", "prediction", "probability").limit(10))

# Evaluate accuracy (area under ROC)
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)

accuracy = predictions.filter(F.col("label") == F.col("prediction")).count() / predictions.count()
print(f"ROC-AUC: {roc_auc:.3f}")
print(f"Accuracy: {accuracy:.3f}")

features,label,prediction,probability
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""1.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.21452940102831605"",""0.7854705989716839""]}"
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""1.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.21452940102831605"",""0.7854705989716839""]}"
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""1.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.21452940102831605"",""0.7854705989716839""]}"
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""1.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.21452940102831605"",""0.7854705989716839""]}"
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""1.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.21452940102831605"",""0.7854705989716839""]}"
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""1.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.21452940102831605"",""0.7854705989716839""]}"
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""1.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.21452940102831605"",""0.7854705989716839""]}"
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""1.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.21452940102831605"",""0.7854705989716839""]}"
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""1.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.21452940102831605"",""0.7854705989716839""]}"
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.0"",""1.0""]}",0,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.21452940102831605"",""0.7854705989716839""]}"


ROC-AUC: 0.606
Accuracy: 0.766
