In [None]:
# PySpark ETL Case Study with Performance Tuning

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, pandas_udf
import pandas as pd

# ---------------------------------------------
# Step 1: SparkSession with tuned configurations
# ---------------------------------------------
spark = SparkSession.builder \
    .appName("SalesETL_PerformanceOptimized") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.shuffle.service.enabled", "true") \
    .getOrCreate()

# -------------------------
# Step 2: Load CSV (Raw)
# -------------------------
# Simulate CSV source with a sample dataset
data = [(i, "Product" + str(i % 5), "Region" + str(i % 3), i * 10.5) for i in range(1, 100001)]
df_raw = spark.createDataFrame(data, ["sale_id", "product", "region", "amount"])

# ------------------------------
# Step 3: Column Pruning + Filter Pushdown
# ------------------------------
df_filtered = df_raw.filter(col("amount") > 200).select("sale_id", "product", "amount")

# ------------------------------
# Step 4: Repartition to increase parallelism
# ------------------------------
df_partitioned = df_filtered.repartition(8, "product")  # Shuffle but improves downstream parallelism

# ------------------------------
# Step 5: Cache intermediate result
# ------------------------------
df_cached = df_partitioned.cache()
print("Filtered Count:", df_cached.count())

# -------------------------------------------
# Step 6: Apply Aggregation with Vectorized UDF
# -------------------------------------------
@pandas_udf("double")
def discount_udf(amount: pd.Series) -> pd.Series:
    return amount * 0.9  # Apply 10% discount

# Apply transformation
transformed_df = df_cached.withColumn("discounted_amount", discount_udf(col("amount")))

# Aggregation per product
agg_df = transformed_df.groupBy("product").agg(
    sum("discounted_amount").alias("total_sales"),
    avg("discounted_amount").alias("avg_sales")
)



Filtered Count: 99981


In [None]:
three layer >> raw files csv 
>> clean >> transfomation (4)
# 3 files , import spark session , class object define ,import the from session file ,
10g,
2g,
--- store file , analysis ,delete

In [11]:
import pandas as pd
# ------------------------------
# Step 7: Save as Partitioned Parquet (Columnar + Partitioning)
# ------------------------------
# agg_df.write.mode("overwrite").partitionBy("product").parquet("sales_agg_parquet")

spark.stop()

In [9]:
import pandas as pd
print(pd.__version__)


2.3.0


In [None]:
# # PySpark Performance Optimization: Concepts & Case Study

# This document explains key PySpark performance optimization techniques followed by a unified ETL case study applying all of them.

# ---

# ## 🔹 1. Push-Down Filters / Predicate Pushdown

# ### **What**: Filters are pushed to the data source to minimize data read.

# ### **Why**: Reduces IO by only reading relevant rows.

# ### **When**: Automatically enabled with Parquet, ORC, JDBC sources.

# ### **If Not Used**: Entire data is loaded into Spark before filtering, leading to performance hits.

# ---

# ## 🔹 2. EXPLAIN and queryExecution

# ### **What**: Tools to inspect Spark's logical and physical execution plans.

# ### **Why**: Helps developers understand and tune query performance.

# ### **When**: Use before heavy queries or when optimizing joins, filters.

# ### **If Not Used**: You may end up with non-performant plans unknowingly.

# ---

# ## 🔹 3. repartition() vs coalesce()

# ### **repartition()**: Increases partitions by full shuffle (expensive but good for parallelism).

# ### **coalesce()**: Decreases partitions without full shuffle (use before writing to reduce small files).

# ---

# ## 🔹 4. persist() and cache()

# ### **What**: Store intermediate results in memory/disk for reuse.

# ### **Why**: Avoid recomputation in iterative workloads.

# ### **When**: Use when a DataFrame is reused multiple times.

# ### **If Not Used**: Redundant computation wastes time and memory.

# ---

# ## 🔹 5. Columnar Storage Optimization

# ### **What**: Use columnar formats like Parquet.

# ### **Why**: Enables predicate pushdown and column pruning.

# ### **When**: Always for disk-based storage.

# ### **If Not Used**: Slower reads, higher IO.

# ---

# ## 🔹 6. Partitioning & Caching

# ### **What**: Partitioning improves read parallelism, caching improves repeated reads.

# ### **Why**: Better scan efficiency.

# ### **When**: Large, often-read datasets (by columns like region, date).

# ---

# ## 🔹 7. Shuffle Operations: Wide vs Narrow

# * **Wide**: Causes shuffles (e.g., joins, groupBy).
# * **Narrow**: No shuffle (e.g., map, filter).

# ### **Multijoins**: Use broadcast joins where possible.

# ### **Window Functions**: Avoid excessive shuffles, partition wisely.

# ### **Shuffling Optimization**: Reduce data size and partitions.

# ### **Partitioning Strategies**: Hash vs range, depends on data skew.

# ### **CI/CD for Spark**: Use modular PySpark code, test with small data, lint and deploy.

# ### **Configs**:

# ```yaml
# spark.executor.memory = 4g
# spark.driver.memory = 2g
# spark.default.parallelism = num_cores * 2
# spark.sql.shuffle.partitions = 8
# ```

# ---

# ## 🔹 8. groupByKey() vs reduceByKey()

# ### **groupByKey()**: Groups data, then applies logic. High shuffle cost.

# ### **reduceByKey()**: Combines data during shuffle. More efficient for aggregations.

# ---

# ## 🔹 9. Configuring spark.sql.shuffle.partitions

# ### **What**: Sets default number of partitions for shuffle operations.

# ### **Why**: Controls parallelism & file output size.

# ### **Best Practice**: Reduce from 200 (default) to optimal based on data size.

# ---

# ## 🔹 10. Avoid Python UDFs

# ### **Why**: Slow, runs in Python interpreter outside JVM.

# ### **Fix**: Use native Spark functions or Pandas UDF.

# ---

# ## 🔹 11. Pandas UDFs (Vectorized)

# ### **Why**: Uses Arrow to process batches of data efficiently.

# ### **When**: Custom logic needed with performance.

# ---

# ## 🔹 12. Arrow Optimization

# ### **What**: Enables fast conversion between Spark and Pandas using Apache Arrow.

# ### **When**: Use in `.toPandas()` or Pandas UDFs.

# ---

# # ✅ Case Study: Sales ETL Pipeline with Tuning

# ### **Business Use Case**

# Process and optimize a sales dataset for discounted product-level analytics.

# ---

# ### **Pipeline Steps**

# 1. **Load**: Simulate or load sales data (CSV/Parquet).
# 2. **Push-down Filter**: Filter rows where `amount > 200`.
# 3. **Column Pruning**: Select only required columns.
# 4. **Repartition**: Repartition by `product` before groupBy.
# 5. **Cache**: Cache filtered data.
# 6. **Pandas UDF**: Apply discount logic.
# 7. **Aggregation**: `groupBy(product)` and aggregate.
# 8. **Write**: Save results to Parquet partitioned by `product`.

# ---

# ### **Configurations Used**

# ```python
# .config("spark.sql.shuffle.partitions", "4")
# .config("spark.executor.memory", "2g")
# .config("spark.driver.memory", "2g")
# .config("spark.sql.execution.arrow.pyspark.enabled", "true")
# ```

# ---

# ### **Expected Outcome**

# * Efficient shuffle operations.
# * Minimal I/O.
# * Fast memory reuse.
# * Optimized write (partitioned Parquet).

# ---

# This unified pipeline reflects how PySpark performance best practices come together in real-world ETL jobs.
