In [1]:
from pyspark.sql import SparkSession
import random

# Create Spark Session with AQE enabled
spark = SparkSession.builder \
    .appName("AQE Demo") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .getOrCreate()

# Create sample datasets
# Sales data with skewed distribution
def create_sales_data():
    data = []
    # Create normal distribution
    for i in range(1000000):
        store_id = random.randint(1, 100)
        amount = random.uniform(10, 1000)
        data.append((store_id, amount))

    # Add skewed data for store_id 1
    for i in range(5000000):
        data.append((1, random.uniform(10, 1000)))

    return spark.createDataFrame(data, ["store_id", "amount"])

# Store information
def create_store_data():
    data = [(i, f"Store_{i}", random.choice(["NY", "CA", "TX", "FL"]))
            for i in range(1, 101)]
    return spark.createDataFrame(data, ["store_id", "store_name", "state"])

# Create our datasets
sales_df = create_sales_data()
stores_df = create_store_data()

# Execute a query that will benefit from AQE
result = sales_df.join(stores_df, "store_id") \
    .groupBy("state") \
    .agg({"amount": "sum"}) \
    .orderBy("state")

# Force execution and show results
result.explain(True)  # Shows the adaptive plan
result.show()


== Parsed Logical Plan ==
'Sort ['state ASC NULLS FIRST], true
+- Aggregate [state#6], [state#6, sum(amount#1) AS sum(amount)#19]
   +- Project [store_id#0L, amount#1, store_name#5, state#6]
      +- Join Inner, (store_id#0L = store_id#4L)
         :- LogicalRDD [store_id#0L, amount#1], false
         +- LogicalRDD [store_id#4L, store_name#5, state#6], false

== Analyzed Logical Plan ==
state: string, sum(amount): double
Sort [state#6 ASC NULLS FIRST], true
+- Aggregate [state#6], [state#6, sum(amount#1) AS sum(amount)#19]
   +- Project [store_id#0L, amount#1, store_name#5, state#6]
      +- Join Inner, (store_id#0L = store_id#4L)
         :- LogicalRDD [store_id#0L, amount#1], false
         +- LogicalRDD [store_id#4L, store_name#5, state#6], false

== Optimized Logical Plan ==
Sort [state#6 ASC NULLS FIRST], true
+- Aggregate [state#6], [state#6, sum(amount#1) AS sum(amount)#19]
   +- Project [amount#1, state#6]
      +- Join Inner, (store_id#0L = store_id#4L)
         :- Filter isno