In [2]:
import pyspark
from pyspark.sql import (
    functions as f,
    SparkSession,
    types as t
)

conf = pyspark.SparkConf().setAll([('spark.sql.optimizer.dynamicPartitionPruning.enabled', 'true')])
spark = SparkSession.builder.appName("partition_pruning").config(conf=conf).getOrCreate()

table_schema = t.StructType([
    t.StructField("date", t.StringType(), True),
    t.StructField("name", t.StringType(), True),
    t.StructField("region", t.IntegerType(), True),
    t.StructField("price", t.IntegerType(), True)])

csv_file_path = "file:///home/jovyan/work/sample/ecommerce_order.csv"
df = spark.read.schema(table_schema).csv(csv_file_path)

# write the file with the partition
df.write\
    .partitionBy("region")\
    .mode("overwrite")\
    .parquet("/home/jovyan/work/output/partition_pruning")

In [3]:
# read the parquet file
read_df = spark.read.parquet("/home/jovyan/work/output/partition_pruning")
sales_total_df = read_df.where("region==2")\
                    .agg(
                        f.round(
                            f.sum("price"),
                            2
                        ).alias("sales"))

In [4]:
sales_total_df.explain(mode="formatted")

== Physical Plan ==
AdaptiveSparkPlan (6)
+- HashAggregate (5)
   +- Exchange (4)
      +- HashAggregate (3)
         +- Project (2)
            +- Scan parquet  (1)


(1) Scan parquet 
Output [2]: [price#19, region#20]
Batched: true
Location: InMemoryFileIndex [file:/home/jovyan/work/output/partition_pruning]
PartitionFilters: [isnotnull(region#20), (region#20 = 2)]
ReadSchema: struct<price:int>

(2) Project
Output [1]: [price#19]
Input [2]: [price#19, region#20]

(3) HashAggregate
Input [1]: [price#19]
Keys: []
Functions [1]: [partial_sum(price#19)]
Aggregate Attributes [1]: [sum#33L]
Results [1]: [sum#34L]

(4) Exchange
Input [1]: [sum#34L]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=34]

(5) HashAggregate
Input [1]: [sum#34L]
Keys: []
Functions [1]: [sum(price#19)]
Aggregate Attributes [1]: [sum(price#19)#26L]
Results [1]: [round(sum(price#19)#26L, 2) AS sales#27L]

(6) AdaptiveSparkPlan
Output [1]: [sales#27L]
Arguments: isFinalPlan=false




In [5]:
sales_total_df.show()

+-------+
|  sales|
+-------+
|4868952|
+-------+



In [6]:
# read dimension table
csv_file_path = "file:///home/jovyan/work/sample/ecommerce_region.csv"
region_df = spark.read\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .csv(csv_file_path)

In [7]:
# before broadcasting
joined_df = read_df.join(region_df,
               read_df.region == region_df.region_id,
               "inner")\
        .where(region_df.city == "San Francisco")

In [8]:
joined_df.show()

+----------+----------------+-----+------+---------+-------------+
|      date|            name|price|region|region_id|         city|
+----------+----------------+-----+------+---------+-------------+
|2022-04-03|    Tory Delgado| 2158|     1|        1|San Francisco|
|2022-05-24|   Jene Franklin| 3643|     1|        1|San Francisco|
|2022-05-28|     Kasey Wolfe| 1236|     1|        1|San Francisco|
|2022-10-07|  Walton Kennedy| 2381|     1|        1|San Francisco|
|2022-10-06|Lakiesha Jimenez| 2629|     1|        1|San Francisco|
|2022-06-13| Piedad Williams| 1670|     1|        1|San Francisco|
|2022-12-13|    Elvina Grant| 2459|     1|        1|San Francisco|
|2022-10-27|   Cristie Stone| 3325|     1|        1|San Francisco|
|2022-07-03|     Lacy Flores| 1013|     1|        1|San Francisco|
|2022-01-01|   Kathey Little| 2293|     1|        1|San Francisco|
|2022-04-13|        Fe Reyes| 2438|     1|        1|San Francisco|
|2022-06-23|   Apryl Holland| 3003|     1|        1|San Franci

In [9]:
# after broadcasting
# broadcasting을 하면 파일을 하나만 읽어온다.
read_df = spark.read.parquet("/home/jovyan/work/output/partition_pruning")
joined_df = read_df.join(f.broadcast(region_df),
               read_df.region == region_df.region_id,
               "inner")\
        .where(region_df.city == "San Francisco")

In [10]:
joined_df.show()

+----------+----------------+-----+------+---------+-------------+
|      date|            name|price|region|region_id|         city|
+----------+----------------+-----+------+---------+-------------+
|2022-04-03|    Tory Delgado| 2158|     1|        1|San Francisco|
|2022-05-24|   Jene Franklin| 3643|     1|        1|San Francisco|
|2022-05-28|     Kasey Wolfe| 1236|     1|        1|San Francisco|
|2022-10-07|  Walton Kennedy| 2381|     1|        1|San Francisco|
|2022-10-06|Lakiesha Jimenez| 2629|     1|        1|San Francisco|
|2022-06-13| Piedad Williams| 1670|     1|        1|San Francisco|
|2022-12-13|    Elvina Grant| 2459|     1|        1|San Francisco|
|2022-10-27|   Cristie Stone| 3325|     1|        1|San Francisco|
|2022-07-03|     Lacy Flores| 1013|     1|        1|San Francisco|
|2022-01-01|   Kathey Little| 2293|     1|        1|San Francisco|
|2022-04-13|        Fe Reyes| 2438|     1|        1|San Francisco|
|2022-06-23|   Apryl Holland| 3003|     1|        1|San Franci

In [11]:
joined_df.explain(mode="formatted")

== Physical Plan ==
AdaptiveSparkPlan (6)
+- BroadcastHashJoin Inner BuildRight (5)
   :- Scan parquet  (1)
   +- BroadcastExchange (4)
      +- Filter (3)
         +- Scan csv  (2)


(1) Scan parquet 
Output [4]: [date#105, name#106, price#107, region#108]
Batched: true
Location: InMemoryFileIndex [file:/home/jovyan/work/output/partition_pruning]
PartitionFilters: [isnotnull(region#108), dynamicpruningexpression(region#108 IN dynamicpruning#155)]
ReadSchema: struct<date:string,name:string,price:int>

(2) Scan csv 
Output [2]: [region_id#59, city#60]
Batched: false
Location: InMemoryFileIndex [file:/home/jovyan/work/sample/ecommerce_region.csv]
PushedFilters: [IsNotNull(city), EqualTo(city,San Francisco), IsNotNull(region_id)]
ReadSchema: struct<region_id:int,city:string>

(3) Filter
Input [2]: [region_id#59, city#60]
Condition : ((isnotnull(city#60) AND (city#60 = San Francisco)) AND isnotnull(region_id#59))

(4) BroadcastExchange
Input [2]: [region_id#59, city#60]
Arguments: HashedRe