In [0]:
dbutils.fs.ls("/databricks-datasets/")

[FileInfo(path='dbfs:/databricks-datasets/COVID/', name='COVID/', size=0, modificationTime=1762892681493),
 FileInfo(path='dbfs:/databricks-datasets/README.md', name='README.md', size=976, modificationTime=1596557781000),
 FileInfo(path='dbfs:/databricks-datasets/Rdatasets/', name='Rdatasets/', size=0, modificationTime=1762892681493),
 FileInfo(path='dbfs:/databricks-datasets/SPARK_README.md', name='SPARK_README.md', size=3359, modificationTime=1596557823000),
 FileInfo(path='dbfs:/databricks-datasets/adult/', name='adult/', size=0, modificationTime=1762892681493),
 FileInfo(path='dbfs:/databricks-datasets/airlines/', name='airlines/', size=0, modificationTime=1762892681493),
 FileInfo(path='dbfs:/databricks-datasets/amazon/', name='amazon/', size=0, modificationTime=1762892681493),
 FileInfo(path='dbfs:/databricks-datasets/asa/', name='asa/', size=0, modificationTime=1762892681493),
 FileInfo(path='dbfs:/databricks-datasets/atlas_higgs/', name='atlas_higgs/', size=0, modificationTime=

In [0]:
import pandas as pd
import time
from pyspark.sql.functions import col, count, avg, sum as spark_sum, desc, when
from pyspark.sql import functions as F
# Verify Spark is ready (spark is pre-created in Databricks)
print(f"✅ Spark version: {spark.version}")
print(f"✅ Ready to go!")

✅ Spark version: 4.0.0
✅ Ready to go!


In [0]:
dbutils.fs.ls("/databricks-datasets/wine-quality/")

[FileInfo(path='dbfs:/databricks-datasets/wine-quality/README.md', name='README.md', size=1066, modificationTime=1596692263000),
 FileInfo(path='dbfs:/databricks-datasets/wine-quality/winequality-red.csv', name='winequality-red.csv', size=84199, modificationTime=1596692263000),
 FileInfo(path='dbfs:/databricks-datasets/wine-quality/winequality-white.csv', name='winequality-white.csv', size=264426, modificationTime=1596692263000)]

In [0]:
path = "/databricks-datasets/wine-quality/winequality-white.csv"

info = dbutils.fs.ls(path)[0]
size_bytes = info.size
size_gb_single = size_bytes / (1024**3)

dup_factor = 5000  # how many times you union it
size_gb_large = size_gb_single * dup_factor

print("Single file size (GB):", size_gb_single)
print("Estimated df_large size (GB):", size_gb_large)

Single file size (GB): 0.0002462659031152725
Estimated df_large size (GB): 1.2313295155763626


In [0]:
# White wine
df_base_white = (
    spark.read.format("csv")
        .option("header", "true")
        .option("sep", ";")
        .option("inferSchema", "true")
        .load("/databricks-datasets/wine-quality/winequality-white.csv")
        .withColumn("wine_type", F.lit("white"))  # tag source
)

# Red wine
df_base_red = (
    spark.read.format("csv")
        .option("header", "true")
        .option("sep", ";")
        .option("inferSchema", "true")
        .load("/databricks-datasets/wine-quality/winequality-red.csv")
        .withColumn("wine_type", F.lit("red"))
)


df_base = df_base_white.unionByName(df_base_red)


In [0]:
# How many times to replicate the full base dataset
repeat_factor = 4000  # How many times to replicate the full base dataset

multiplier = spark.range(repeat_factor).toDF("rep_id")

df_large = (
    df_base
        .crossJoin(multiplier)   # replicates each row repeat_factor times
        .drop("rep_id")          # no longer needed
)

print(df_large.count())


25988000


In [0]:
df_filtered = (
    df_large
        .filter(
            F.col("quality").isNotNull() &
            F.col("alcohol").isNotNull() &
            F.col("residual sugar").isNotNull()
        )
        # want decent quality
        .filter(F.col("quality") >= 5)
        # avoid extreme outliers in volatile acidity
        .filter(F.col("volatile acidity") < 1.5)
)


In [0]:
df_transformed = (
    df_filtered
        # binary label for high quality
        .withColumn("is_high_quality", (F.col("quality") >= 7).cast("int"))
        # alc to sugar ratio
        .withColumn(
            "alcohol_sugar_ratio",
            F.col("alcohol") / (F.col("residual sugar") + F.lit(1e-6))
        )
)


In [0]:
#groupBy aggregation
quality_summary = (
    df_transformed
        .groupBy("wine_type", "quality")
        .agg(
            F.count("*").alias("num_wines"),
            F.round(F.avg("alcohol"), 3).alias("avg_alcohol"),
            F.round(F.avg("residual sugar"), 3).alias("avg_residual_sugar"),
            F.round(F.avg("alcohol_sugar_ratio"), 3).alias("avg_alcohol_sugar_ratio"),
            F.round(F.avg("is_high_quality") * 100, 2).alias("pct_high_quality")
        )
        .orderBy("wine_type", "quality")
)

quality_summary.show(20, truncate=False)


+---------+-------+---------+-----------+------------------+-----------------------+----------------+
|wine_type|quality|num_wines|avg_alcohol|avg_residual_sugar|avg_alcohol_sugar_ratio|pct_high_quality|
+---------+-------+---------+-----------+------------------+-----------------------+----------------+
|red      |5      |2724000  |9.9        |2.529             |4.449                  |0.0             |
|red      |6      |2552000  |10.63      |2.477             |4.86                   |0.0             |
|red      |7      |796000   |11.466     |2.721             |4.895                  |100.0           |
|red      |8      |72000    |12.094     |2.578             |5.415                  |100.0           |
|white    |5      |5828000  |9.809      |7.335             |3.204                  |0.0             |
|white    |6      |8792000  |10.575     |6.442             |3.687                  |0.0             |
|white    |7      |3520000  |11.368     |5.186             |4.241                 

In [0]:
#join operation

white_stats = (
    df_transformed
        .filter(F.col("wine_type") == "white")
        .groupBy("quality")
        .agg(
            F.round(F.avg("alcohol"), 3).alias("white_avg_alcohol"),
            F.count("*").alias("white_count")
        )
)

red_stats = (
    df_transformed
        .filter(F.col("wine_type") == "red")
        .groupBy("quality")
        .agg(
            F.round(F.avg("alcohol"), 3).alias("red_avg_alcohol"),
            F.count("*").alias("red_count")
        )
)

quality_compare = (
    white_stats
        .join(red_stats, on="quality", how="inner")
        .orderBy("quality")
)

quality_compare.show(truncate=False)


+-------+-----------------+-----------+---------------+---------+
|quality|white_avg_alcohol|white_count|red_avg_alcohol|red_count|
+-------+-----------------+-----------+---------------+---------+
|5      |9.809            |5828000    |9.9            |2724000  |
|6      |10.575           |8792000    |10.63          |2552000  |
|7      |11.368           |3520000    |11.466         |796000   |
|8      |11.636           |700000     |12.094         |72000    |
+-------+-----------------+-----------+---------------+---------+



In [0]:
df_transformed.createOrReplaceTempView("wine_large")


In [0]:
sql_summary_by_type = spark.sql("""
SELECT
    wine_type,
    COUNT(*)                         AS num_rows,
    AVG(quality)                     AS avg_quality,
    AVG(alcohol)                     AS avg_alcohol,
    AVG(is_high_quality) * 100       AS pct_high_quality
FROM wine_large
GROUP BY wine_type
ORDER BY avg_quality DESC
""")

sql_summary_by_type.show()


+---------+--------+------------------+------------------+------------------+
|wine_type|num_rows|       avg_quality|       avg_alcohol|  pct_high_quality|
+---------+--------+------------------+------------------+------------------+
|    white|18860000|5.9550371155885475|10.527493107104402|22.481442205726403|
|      red| 6144000| 5.709635416666667|10.431477864583332|14.127604166666666|
+---------+--------+------------------+------------------+------------------+



In [0]:
sql_high_segment = spark.sql("""
SELECT
    wine_type,
    quality,
    COUNT(*)              AS num_wines,
    AVG(alcohol)          AS avg_alcohol,
    AVG(`residual sugar`) AS avg_residual_sugar
FROM wine_large
WHERE
    quality >= 7
    AND alcohol >= 11
GROUP BY wine_type, quality
ORDER BY quality DESC, wine_type
""")

sql_high_segment.show()

+---------+-------+---------+------------------+------------------+
|wine_type|quality|num_wines|       avg_alcohol|avg_residual_sugar|
+---------+-------+---------+------------------+------------------+
|    white|      9|    16000|12.624999999999998|               2.5|
|      red|      8|    64000|12.368750000000889|           2.65625|
|    white|      8|   540000|12.209629629628767| 4.697777777777816|
|      red|      7|   552000|11.961352657005365|2.7398550724638926|
|    white|      7|  2308000|  12.0947429231664| 3.985441941074224|
+---------+-------+---------+------------------+------------------+



In [0]:
#partitioning
import time

start = time.time()
baseline = (
    df_transformed
        .groupBy("wine_type")
        .agg(F.count("*").alias("num_wines"))
        .collect()
)
baseline_time = time.time() - start
print(f"Default partitions: {baseline_time:.2f}s")

df_few = df_transformed.repartition(4)
start = time.time()
few = (
    df_few
        .groupBy("wine_type")
        .agg(F.count("*").alias("num_wines"))
        .collect()
)
few_time = time.time() - start
print(f"4 partitions:     {few_time:.2f}s")

df_many = df_transformed.repartition(32)
start = time.time()
many = (
    df_many
        .groupBy("wine_type")
        .agg(F.count("*").alias("num_wines"))
        .collect()
)
many_time = time.time() - start
print(f"32 partitions:     {many_time:.2f}s")

Default partitions: 0.42s
4 partitions:     1.38s
32 partitions:     1.47s


In [0]:
#since it is slower with partitioning (likely due to the overhead cost of it), I will not use partitioning
df_transformed.createOrReplaceTempView("wine_large")

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS student_db")

quality_summary.write.mode("overwrite").saveAsTable("student_db.wine_quality_summary")

print("Saved table: student_db.wine_quality_summary")

display(spark.table("student_db.wine_quality_summary").limit(10))



Saved table: student_db.wine_quality_summary


wine_type,quality,num_wines,avg_alcohol,avg_residual_sugar,avg_alcohol_sugar_ratio,pct_high_quality
red,5,2724000,9.9,2.529,4.449,0.0
red,6,2552000,10.63,2.477,4.86,0.0
red,7,796000,11.466,2.721,4.895,100.0
red,8,72000,12.094,2.578,5.415,100.0
white,5,5828000,9.809,7.335,3.204,0.0
white,6,8792000,10.575,6.442,3.687,0.0
white,7,3520000,11.368,5.186,4.241,100.0
white,8,700000,11.636,5.671,3.914,100.0
white,9,20000,12.18,4.12,4.774,100.0


In [0]:
quality_compare.write.mode("overwrite").saveAsTable("student_db.wine_quality_compare")

print("Saved table: student_db.wine_quality_compare")

display(spark.table("student_db.wine_quality_compare").limit(10))


Saved table: student_db.wine_quality_compare


quality,white_avg_alcohol,white_count,red_avg_alcohol,red_count
5,9.809,5828000,9.9,2724000
6,10.575,8792000,10.63,2552000
7,11.368,3520000,11.466,796000
8,11.636,700000,12.094,72000


In [0]:
quality_summary.explain("formatted")

== Physical Plan ==
AdaptiveSparkPlan (29)
+- == Initial Plan ==
   ColumnarToRow (28)
   +- PhotonResultStage (27)
      +- PhotonSort (26)
         +- PhotonShuffleExchangeSource (25)
            +- PhotonShuffleMapStage (24)
               +- PhotonShuffleExchangeSink (23)
                  +- PhotonProject (22)
                     +- PhotonGroupingAgg (21)
                        +- PhotonShuffleExchangeSource (20)
                           +- PhotonShuffleMapStage (19)
                              +- PhotonShuffleExchangeSink (18)
                                 +- PhotonGroupingAgg (17)
                                    +- PhotonBroadcastNestedLoopJoin Cross BuildRight (16)
                                       :- PhotonGroupingAgg (10)
                                       :  +- PhotonUnion (9)
                                       :     :- PhotonProject (4)
                                       :     :  +- PhotonFilter (3)
                                       :     

In [0]:
quality_summary.show()

+---------+-------+---------+-----------+------------------+-----------------------+----------------+
|wine_type|quality|num_wines|avg_alcohol|avg_residual_sugar|avg_alcohol_sugar_ratio|pct_high_quality|
+---------+-------+---------+-----------+------------------+-----------------------+----------------+
|      red|      5|  2724000|        9.9|             2.529|                  4.449|             0.0|
|      red|      6|  2552000|      10.63|             2.477|                   4.86|             0.0|
|      red|      7|   796000|     11.466|             2.721|                  4.895|           100.0|
|      red|      8|    72000|     12.094|             2.578|                  5.415|           100.0|
|    white|      5|  5828000|      9.809|             7.335|                  3.204|             0.0|
|    white|      6|  8792000|     10.575|             6.442|                  3.687|             0.0|
|    white|      7|  3520000|     11.368|             5.186|                  4.24

In [0]:
sql_high_segment.show()

+---------+-------+---------+------------------+------------------+
|wine_type|quality|num_wines|       avg_alcohol|avg_residual_sugar|
+---------+-------+---------+------------------+------------------+
|    white|      9|    16000|12.624999999999998|               2.5|
|      red|      8|    64000|12.368750000000889|           2.65625|
|    white|      8|   540000|12.209629629628767| 4.697777777777816|
|      red|      7|   552000|11.961352657005365|2.7398550724638926|
|    white|      7|  2308000|  12.0947429231664| 3.985441941074224|
+---------+-------+---------+------------------+------------------+



In [0]:
#lazy
print("Step 1: Define a few transformations on df_transformed...\n")

demo_df = (
    df_transformed
        .filter(F.col("quality") >= 7)         # transformation 1 (lazy)
        .withColumn("quality_plus_one", F.col("quality") + 1)  # transformation 2 (lazy)
        .select("wine_type", "quality", "quality_plus_one")    # transformation 3 (lazy)
)

print("transformations defined")
print("no data has been fetched or computed yet (lazy)")
print("you won't see any spark job in the UI for this cell.\n")


Step 1: Define a few transformations on df_transformed...

Transformations defined.
No data has been fetched or computed yet (lazy).
You won't see any Spark job in the UI for this cell.



In [0]:
#actions
print("Step 2: Trigger actions on the transformed DataFrame...\n")

print("Action 1: show()")
demo_df.show(5)

print("\nAction 2: count()")
print("Row count:", demo_df.count())


Step 2: Trigger actions on the transformed DataFrame...

Action 1: show()
+---------+-------+----------------+
|wine_type|quality|quality_plus_one|
+---------+-------+----------------+
|    white|      7|               8|
|    white|      7|               8|
|    white|      8|               9|
|    white|      8|               9|
|    white|      7|               8|
+---------+-------+----------------+
only showing top 5 rows

Action 2: count()
Row count: 5108000
