In [1]:
import findspark
findspark.init('/Users/abhinavjha/spark', edit_rc=True)

In [2]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

24/05/27 19:53:00 WARN Utils: Your hostname, Abhinavs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.29.208 instead (on interface en0)
24/05/27 19:53:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/27 19:53:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
transaction_file = '/Users/abhinavjha/spark-experiments/data/data_skew/transactions.parquet'
transaction_df = spark.read.parquet(transaction_file)

In [None]:
transaction_df.show(5)

In [5]:
transaction_df.show(5)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|   cust_id|start_date|  end_date|         txn_id|      date|year|month|day| expense_type|   amt|       city|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|   10|  7|Entertainment| 10.42|     boston|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|    3| 27| Motor/Travel| 44.34|   portland|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|    4| 11|Entertainment|  3.18|    chicago|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|    2| 22|    Groceries|268.97|los_angeles|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYL6DFP09PPXMVB|2010-10-16|2010|   10| 16|Entertainment|  2.66|    chicago|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
only showi

In [5]:
customers_file = '/Users/abhinavjha/spark-experiments/data/data_skew/customers.parquet'
customers_df = spark.read.parquet(customers_file)

In [7]:
customers_df.show(5)

+----------+-------------+---+------+----------+-----+-----------+
|   cust_id|         name|age|gender|  birthday|  zip|       city|
+----------+-------------+---+------+----------+-----+-----------+
|C007YEYTX9| Aaron Abbott| 34|Female| 7/13/1991|97823|     boston|
|C00B971T1J| Aaron Austin| 37|Female|12/16/2004|30332|    chicago|
|C00WRSJF1Q| Aaron Barnes| 29|Female| 3/11/1977|23451|     denver|
|C01AZWQMF3|Aaron Barrett| 31|  Male|  7/9/1998|46613|los_angeles|
|C01BKUFRHA| Aaron Becker| 54|  Male|11/24/1979|40284|  san_diego|
+----------+-------------+---+------+----------+-----+-----------+
only showing top 5 rows



## Narrow Transformations (Shuffling not required)

1. filter
2. Select, add, alter columns

In [12]:
df_narrow_transformations = (
    customers_df.
    filter(F.col("city")=="boston")
    .withColumn("first_name", F.split("name", " ").getItem(0))
    .withColumn("last_name", F.split("name", " ").getItem(1))
    .withColumn("age", F.col("age")+F.lit(5))
    .select(["first_name","last_name", "age","gender","city"])

)

df_narrow_transformations.show(5)
df_narrow_transformations.explain(True)

+----------+---------+----+------+------+
|first_name|last_name| age|gender|  city|
+----------+---------+----+------+------+
|     Aaron|   Abbott|39.0|Female|boston|
|     Aaron|  Lambert|59.0|Female|boston|
|     Aaron|  Lindsey|29.0|  Male|boston|
|     Aaron|    Lopez|27.0|Female|boston|
|     Aaron| Schwartz|57.0|Female|boston|
+----------+---------+----+------+------+
only showing top 5 rows

== Parsed Logical Plan ==
'Project ['first_name, 'last_name, 'age, 'gender, 'city]
+- Project [cust_id#68, name#69, (cast(age#70 as double) + cast(5 as double)) AS age#364, gender#71, birthday#72, zip#73, city#74, first_name#345, last_name#354]
   +- Project [cust_id#68, name#69, age#70, gender#71, birthday#72, zip#73, city#74, first_name#345, split(name#69,  , -1)[1] AS last_name#354]
      +- Project [cust_id#68, name#69, age#70, gender#71, birthday#72, zip#73, city#74, split(name#69,  , -1)[0] AS first_name#345]
         +- Filter (city#74 = boston)
            +- Relation [cust_id#68,na

## Wide Transformations (Shuffling is required)
1. Repartition
2. Coalesce
3. joins
4. GroupBy

In [13]:
df_narrow_transformations.rdd.getNumPartitions()

1

In [14]:
new_df = df_narrow_transformations.repartition(12).explain(True)

== Parsed Logical Plan ==
Repartition 12, true
+- Project [first_name#345, last_name#354, age#364, gender#71, city#74]
   +- Project [cust_id#68, name#69, (cast(age#70 as double) + cast(5 as double)) AS age#364, gender#71, birthday#72, zip#73, city#74, first_name#345, last_name#354]
      +- Project [cust_id#68, name#69, age#70, gender#71, birthday#72, zip#73, city#74, first_name#345, split(name#69,  , -1)[1] AS last_name#354]
         +- Project [cust_id#68, name#69, age#70, gender#71, birthday#72, zip#73, city#74, split(name#69,  , -1)[0] AS first_name#345]
            +- Filter (city#74 = boston)
               +- Relation [cust_id#68,name#69,age#70,gender#71,birthday#72,zip#73,city#74] parquet

== Analyzed Logical Plan ==
first_name: string, last_name: string, age: double, gender: string, city: string
Repartition 12, true
+- Project [first_name#345, last_name#354, age#364, gender#71, city#74]
   +- Project [cust_id#68, name#69, (cast(age#70 as double) + cast(5 as double)) AS age#36

In [None]:
# Repartition used shuffle to redistribute data across executors. 
# RoundRobinPartitioning -  Algorithm of performing repartition

In [None]:
# Coalesce - Used to decrease the num of partitions. Difference between df.rdd.repartition and df.rdd.coalesce is
# coalesce will always try not to perform shuffle and merge paritions within executors in order to reduce num of partitions.

# However, in extreme case coalesce can also perform shuffling in order to meet the num of partitions requirements.

In [15]:

new_df = df_narrow_transformations.repartition(12)

In [16]:
new_df.rdd.getNumPartitions()

12

In [20]:
new_df.coalesce(5).explain(True)

== Parsed Logical Plan ==
Repartition 5, false
+- Repartition 12, true
   +- Project [first_name#345, last_name#354, age#364, gender#71, city#74]
      +- Project [cust_id#68, name#69, (cast(age#70 as double) + cast(5 as double)) AS age#364, gender#71, birthday#72, zip#73, city#74, first_name#345, last_name#354]
         +- Project [cust_id#68, name#69, age#70, gender#71, birthday#72, zip#73, city#74, first_name#345, split(name#69,  , -1)[1] AS last_name#354]
            +- Project [cust_id#68, name#69, age#70, gender#71, birthday#72, zip#73, city#74, split(name#69,  , -1)[0] AS first_name#345]
               +- Filter (city#74 = boston)
                  +- Relation [cust_id#68,name#69,age#70,gender#71,birthday#72,zip#73,city#74] parquet

== Analyzed Logical Plan ==
first_name: string, last_name: string, age: double, gender: string, city: string
Repartition 5, false
+- Repartition 12, true
   +- Project [first_name#345, last_name#354, age#364, gender#71, city#74]
      +- Project [cus

In [21]:
# Broadcast join

print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))


10485760b


In [22]:
new_df.cache().foreach(lambda x: x) 
catalyst_plan = new_df._jdf.queryExecution().logical() 
spark._jsparkSession.sessionState().executePlan(catalyst_plan).optimizedPlan().stats().sizeInBytes()

                                                                                

Py4JError: An error occurred while calling o147.executePlan. Trace:
py4j.Py4JException: Method executePlan([class org.apache.spark.sql.catalyst.plans.logical.Repartition]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)



In [23]:
customers_df.show(5)

+----------+-------------+---+------+----------+-----+-----------+
|   cust_id|         name|age|gender|  birthday|  zip|       city|
+----------+-------------+---+------+----------+-----+-----------+
|C007YEYTX9| Aaron Abbott| 34|Female| 7/13/1991|97823|     boston|
|C00B971T1J| Aaron Austin| 37|Female|12/16/2004|30332|    chicago|
|C00WRSJF1Q| Aaron Barnes| 29|Female| 3/11/1977|23451|     denver|
|C01AZWQMF3|Aaron Barrett| 31|  Male|  7/9/1998|46613|los_angeles|
|C01BKUFRHA| Aaron Becker| 54|  Male|11/24/1979|40284|  san_diego|
+----------+-------------+---+------+----------+-----+-----------+
only showing top 5 rows



In [24]:
transaction_df.show(5)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|   cust_id|start_date|  end_date|         txn_id|      date|year|month|day| expense_type|   amt|       city|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|   10|  7|Entertainment| 10.42|     boston|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|    3| 27| Motor/Travel| 44.34|   portland|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|    4| 11|Entertainment|  3.18|    chicago|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|    2| 22|    Groceries|268.97|los_angeles|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYL6DFP09PPXMVB|2010-10-16|2010|   10| 16|Entertainment|  2.66|    chicago|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
only showi

In [25]:
customers_df.count()

5000

In [26]:
transaction_df.count()

39790092

In [28]:
from pyspark.sql.functions import broadcast
joined_df = transaction_df.join(broadcast(customers_df),how="inner", on="cust_id")

In [29]:
joined_df.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [cust_id])
:- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet
+- ResolvedHint (strategy=broadcast)
   +- Relation [cust_id#68,name#69,age#70,gender#71,birthday#72,zip#73,city#74] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string
Project [cust_id#0, start_date#1, end_date#2, txn_id#3, date#4, year#5, month#6, day#7, expense_type#8, amt#9, city#10, name#69, age#70, gender#71, birthday#72, zip#73, city#74]
+- Join Inner, (cust_id#0 = cust_id#68)
   :- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet
   +- ResolvedHint (strategy=broadcast)
      +- Relation 

In [None]:
# If one of the dataset is comparatively smaller, BHJ can be used and it can be more optimal. In BHJ, smaller dataset
# is hashed and broadcasted to all other nodes where we have partitions of bigger dataset. Then on each of the partitions
# join is performed. There is not sort and merge required and hence no shuffling is required

# spark.sql.autoBroadcastJoinThreshold
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", )

In [8]:
# Shuffle sort merge join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.preferSortMergeJoin", True)


In [30]:
joined_df.show(5)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+--------+---+------+---------+-----+------+
|   cust_id|start_date|  end_date|         txn_id|      date|year|month|day| expense_type|   amt|       city|    name|age|gender| birthday|  zip|  city|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+--------+---+------+---------+-----+------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|   10|  7|Entertainment| 10.42|     boston|Ada Lamb| 32|Female|9/29/2005|22457|denver|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|    3| 27| Motor/Travel| 44.34|   portland|Ada Lamb| 32|Female|9/29/2005|22457|denver|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|    4| 11|Entertainment|  3.18|    chicago|Ada Lamb| 32|Female|9/29/2005|22457|denver|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|    2| 22|    Gr

In [9]:
sort_merge_join = transaction_df.join(customers_df, how="inner", on="cust_id")

## Shuffle Sort Merge Join

1. It involves three phases
    1.1 Shuffle
    1.2 Sort
    1.3 Merge

- Both datasets are shuffled so that records with same keys ended up in the same partition.
- Hash the key to get the partition in which the key will go to. By this, we have records with same keys in the same partition.
- After shuffling , records are sorted based on the keys
- A join is performed by iterating over the records on the sorted dataset. Since the dataset is sorted the merge or the join operation is stopped for an element as soon as a key mismatch is encountered. So a join attempt is not performed on all keys.


In [10]:
sort_merge_join.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [cust_id])
:- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet
+- Relation [cust_id#68,name#69,age#70,gender#71,birthday#72,zip#73,city#74] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string
Project [cust_id#0, start_date#1, end_date#2, txn_id#3, date#4, year#5, month#6, day#7, expense_type#8, amt#9, city#10, name#69, age#70, gender#71, birthday#72, zip#73, city#74]
+- Join Inner, (cust_id#0 = cust_id#68)
   :- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet
   +- Relation [cust_id#68,name#69,age#70,gender#71,birthday#72,zip#73,city#74] parquet

== Optimi

## Group By

In [6]:
transaction_df.show(5)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|   cust_id|start_date|  end_date|         txn_id|      date|year|month|day| expense_type|   amt|       city|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|   10|  7|Entertainment| 10.42|     boston|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|    3| 27| Motor/Travel| 44.34|   portland|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|    4| 11|Entertainment|  3.18|    chicago|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|    2| 22|    Groceries|268.97|los_angeles|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYL6DFP09PPXMVB|2010-10-16|2010|   10| 16|Entertainment|  2.66|    chicago|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
only showi

In [7]:
city_count = transaction_df.groupBy("city").count()

In [8]:
city_count.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, count(1) AS count#94L]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
city: string, count: bigint
Aggregate [city#10], [city#10, count(1) AS count#94L]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Aggregate [city#10], [city#10, count(1) AS count#94L]
+- Project [city#10]
   +- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#10], functions=[count(1)], output=[city#10, count#94L])
   +- Exchange hashpartitioning(city#10, 200), ENSURE_REQUIREMENTS, [plan_id=27]
      +- HashAggregate(keys=[city#10], functions=[partial_count(1)], output=[city#10, count#98L])
         +- FileSc

## Aggregate

In [9]:
transaction_df.show(5)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|   cust_id|start_date|  end_date|         txn_id|      date|year|month|day| expense_type|   amt|       city|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|   10|  7|Entertainment| 10.42|     boston|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|    3| 27| Motor/Travel| 44.34|   portland|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|    4| 11|Entertainment|  3.18|    chicago|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|    2| 22|    Groceries|268.97|los_angeles|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYL6DFP09PPXMVB|2010-10-16|2010|   10| 16|Entertainment|  2.66|    chicago|
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
only showi

In [10]:
txn_amt_agg = transaction_df.groupby('city').agg(F.sum('amt').alias('txn_amt'))

In [11]:
txn_amt_agg.show()



+-------------+--------------------+
|         city|             txn_amt|
+-------------+--------------------+
|    san_diego| 3.297982686000007E8|
|      chicago|3.2988120044000095E8|
|       denver| 3.298814956400023E8|
|       boston| 3.301009563300014E8|
|      seattle|3.3019513776999813E8|
|  los_angeles| 3.303879507600016E8|
|     new_york| 3.298224222899999E8|
|san_francisco| 3.302435954000013E8|
| philadelphia| 3.303539707999984E8|
|     portland| 3.309791199400005E8|
+-------------+--------------------+



                                                                                

In [12]:
txn_amt_agg.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, sum('amt) AS txn_amt#156]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
city: string, txn_amt: double
Aggregate [city#10], [city#10, sum(cast(amt#9 as double)) AS txn_amt#156]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Aggregate [city#10], [city#10, sum(cast(amt#9 as double)) AS txn_amt#156]
+- Project [amt#9, city#10]
   +- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#10], functions=[sum(cast(amt#9 as double))], output=[city#10, txn_amt#156])
   +- Exchange hashpartitioning(city#10, 200), ENSURE_REQUIREMENTS, [plan_id=106]
      +- HashAggregate(keys=[city#10], func

- functions=[sum(cast(amt#9 as double))]

## count distinct

In [15]:
city_count_df = transaction_df.groupby('cust_id').agg(F.countDistinct('city').alias('city_count'))

In [16]:
city_count_df.show()



+----------+----------+
|   cust_id|city_count|
+----------+----------+
|CPP8BY8U93|        10|
|CYB8BX9LU1|        10|
|CFRT841CCD|        10|
|CA0TSNMYDK|        10|
|COZ8NONEVZ|        10|
|C46OCVH3WG|        10|
|C1QF29WCA6|        10|
|CTJBQB0OJ1|        10|
|CD0DXL8XTM|        10|
|CADBQ5OL5C|        10|
|CUCQ9LBQWW|        10|
|C3NH8CDGWM|        10|
|CEEPXNQ9NQ|        10|
|C7ALJDG81A|        10|
|CUDKFKPAFB|        10|
|C2L2984OZK|        10|
|CDDRDAEY13|        10|
|CIZT509YVA|        10|
|CSTJ6YYXE3|        10|
|CW1X1V0PRG|        10|
+----------+----------+
only showing top 20 rows



                                                                                

In [17]:
city_count_df.explain(True)

== Parsed Logical Plan ==
'Aggregate ['cust_id], ['cust_id, 'count(distinct 'city) AS city_count#211]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
cust_id: string, city_count: bigint
Aggregate [cust_id#0], [cust_id#0, count(distinct city#10) AS city_count#211L]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Aggregate [cust_id#0], [cust_id#0, count(distinct city#10) AS city_count#211L]
+- Project [cust_id#0, city#10]
   +- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[cust_id#0], functions=[count(distinct city#10)], output=[cust_id#0, city_count#211L])
   +- Exchange hashpartitioning(cust_id#0, 200), ENSURE_REQUIREMENTS, [plan_id

### Why is a filter step present despite predicate pushdown?
- This is largely due to the way Spark's Catalyst Optimizer works. Specifically, due to two separate stages of the query optimization process: Physical Planning and Logical Planning.

Logical Planning: Catalyst optimizer simplifies the unresolved logical plan (which represents the user's query) by applying various rule-based optimizations. This includes predicate pushdown, projection pushdown where filter conditions and column projections are moved as close to the data source as possible.

Physical Planning phase is where the logical plan is translated into one or more physical plans, which can actually be executed on the cluster. This includes operations like file scans, filters, projections, etc.

In this case, during the logical planning phase, the predicate (F.col("city") == "boston") has been pushed down and will be applied during the scan of the Parquet file (PushedFilters: [IsNotNull(city), EqualTo(city,boston)]), thus improving performance.

Now, during the physical planning phase, the same filter condition (+- *(1) Filter (isnotnull(city#73) AND (city#73 = boston))) is applied again to the data that's been loaded into memory. This is because of the following reasons:

Guaranteed Correctness: It might seem redundant, but remember that not all data sources can handle pushed-down predicates, and not all predicates can be pushed down. Therefore, even if a predicate is pushed down to the data source, Spark still includes the predicate in the physical plan to cover cases where the data source might not have been able to fully apply the predicate. This is Spark's way of making sure the correct data is always returned, no matter the capabilities of the data source.

No Assumptions: Spark's Catalyst optimizer doesn't make assumptions about the data source's ability to handle pushed-down predicates. The optimizer aims to generate plans that return correct results across a wide range of scenarios. Even if the filter is pushed down, Spark does not have the feedback from data source whether the pushdown was successful or not, so it includes the filter operation in the physical plan as well.

It is more of a fail-safe mechanism to ensure data integrity and correctness.