In [0]:
input = "/FileStore/tables/train.csv"

## **Adaptive Query Execution**

In Apache Spark, adaptive query execution (AQE) is a feature that allows the query optimizer to dynamically adjust the execution plan of a query based on the runtime statistics of the data and the cluster resources. AQE can improve the performance of Spark SQL queries by:

1.   Reordering the physical execution plan based on the data distribution and the statistics.
2.   Pushing down predicates and projections to the data sources.
3.   Pruning unnecessary partitions and data sources.
4.   Enabling/disabling broadcast joins and bucketing.
5.   Enabling/disabling subquery caching.

AQE is enabled by default in Spark SQL and can be fine-tuned by adjusting configuration settings. However, it's important to note that AQE is a complex feature and may not always produce the best plan for all queries. Therefore, it's also important to monitor the performance of the queries and make adjustments as necessary.

In [0]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("AQE Example").getOrCreate()

# Enable AQE by setting spark.sql.adaptive.enabled to true
spark.conf.set("spark.sql.adaptive.enabled", "true")

# Read a dataframe from a CSV file
df = spark.read.csv(input, header=True, inferSchema=True)

df1 = df.select(df["purchase"]>10000)

df1.explain()

== Physical Plan ==
*(1) Project [(purchase#2261 > 10000) AS (purchase > 10000)#2274]
+- FileScan csv [Purchase#2261] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/tables/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Purchase:int>




In [0]:
# Register the dataframe as a temporary table
df.createOrReplaceTempView("data")

# Execute a query using AQE
result = spark.sql("SELECT * FROM data WHERE Purchase > 10000")

# Show the query execution plan
result.explain()

== Physical Plan ==
*(1) Filter (isnotnull(Purchase#2261) AND (Purchase#2261 > 10000))
+- FileScan csv [User_ID#2250,Product_ID#2251,Gender#2252,Age#2253,Occupation#2254,City_Category#2255,Stay_In_Current_City_Years#2256,Marital_Status#2257,Product_Category_1#2258,Product_Category_2#2259,Product_Category_3#2260,Purchase#2261] Batched: false, DataFilters: [isnotnull(Purchase#2261), (Purchase#2261 > 10000)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/tables/train.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Purchase), GreaterThan(Purchase,10000)], ReadSchema: struct<User_ID:int,Product_ID:string,Gender:string,Age:string,Occupation:int,City_Category:string...




This example shows how to enable AQE, read a CSV file as a dataframe, and then register it as a temporary table. Then, it runs a query on this table using AQE. Finally, it shows the query execution plan using the explain() method.

It's important to note that AQE does not change the query itself, but it adjusts the execution plan of the query based on the statistics and resources available.