# Parquet Basics and Layout Strategies

This notebook demonstrates fundamental Parquet optimization techniques.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName("Parquet Basics") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

print(f"Spark {spark.version} ready!")
print(f"Master: {spark.sparkContext.master}")

In [None]:
# Generate sample sales data
df = spark.range(0, 1_000_000) \
    .select(
        (col("id") % 100000).alias("customer_id"),
        (col("id") % 10000).alias("product_id"),
        expr("date_add('2024-01-01', cast(id % 365 as int))").alias("sale_date"),
        (rand() * 1000).alias("amount"),
        (col("id") % 5).cast("int").alias("region_id")
    ) \
    .withColumn("region", 
        when(col("region_id") == 0, "US")
        .when(col("region_id") == 1, "EU")
        .when(col("region_id") == 2, "ASIA")
        .when(col("region_id") == 3, "LATAM")
        .otherwise("OTHER")
    ) \
    .withColumn("year", year("sale_date")) \
    .withColumn("month", month("sale_date"))

df.show(5)
print(f"Generated {df.count():,} rows")

In [None]:
# Write unpartitioned Parquet
df.write.mode("overwrite").parquet("s3a://warehouse/sales_unpartitioned")
print("✓ Unpartitioned written")

In [None]:
# Write partitioned by year/month
df.write.mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("s3a://warehouse/sales_partitioned")
print("✓ Partitioned written")

In [None]:
# Compare query performance
import time

# Unpartitioned query
unpart = spark.read.parquet("s3a://warehouse/sales_unpartitioned")
t0 = time.time()
count_unpart = unpart.filter("month = 6").count()
t_unpart = time.time() - t0

# Partitioned query
part = spark.read.parquet("s3a://warehouse/sales_partitioned")
t0 = time.time()
count_part = part.filter("month = 6").count()
t_part = time.time() - t0

print(f"Unpartitioned: {count_unpart:,} rows in {t_unpart:.3f}s")
print(f"Partitioned:   {count_part:,} rows in {t_part:.3f}s")
print(f"Speedup: {t_unpart/t_part:.2f}x")

In [None]:
# Show physical plans
print("\nUnpartitioned plan:")
unpart.filter("month = 6").explain()

print("\nPartitioned plan:")
part.filter("month = 6").explain()