# Imports & Configuration

In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [3]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [41]:
spark = SparkSession.builder.master("local[3]").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

# Reading File

In [5]:
transactions_file = "../data/data_skew/transactions.parquet"
df_transactions = spark.read.parquet(transactions_file)

                                                                                

In [6]:
df_transactions.show(5, False)

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|cust_id   |start_date|end_date  |txn_id         |date      |year|month|day|expense_type |amt   |city       |
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|10   |7  |Entertainment|10.42 |boston     |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|3    |27 |Motor/Travel |44.34 |portland   |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|4    |11 |Entertainment|3.18  |chicago    |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|2    |22 |Groceries    |268.97|los_angeles|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYL6DFP09PPXMVB|2010-10-16|2010|10   |16 |Entertainment|2.66  |chicago    |
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
only showi

In [7]:
customers_file = "../data/data_skew/customers.parquet"
df_customers = spark.read.parquet(customers_file)

In [8]:
df_customers.show(5, False)

+----------+-------------+---+------+----------+-----+-----------+
|cust_id   |name         |age|gender|birthday  |zip  |city       |
+----------+-------------+---+------+----------+-----+-----------+
|C007YEYTX9|Aaron Abbott |34 |Female|7/13/1991 |97823|boston     |
|C00B971T1J|Aaron Austin |37 |Female|12/16/2004|30332|chicago    |
|C00WRSJF1Q|Aaron Barnes |29 |Female|3/11/1977 |23451|denver     |
|C01AZWQMF3|Aaron Barrett|31 |Male  |7/9/1998  |46613|los_angeles|
|C01BKUFRHA|Aaron Becker |54 |Male  |11/24/1979|40284|san_diego  |
+----------+-------------+---+------+----------+-----+-----------+
only showing top 5 rows



In [9]:
df_customers.explain(True)

== Parsed Logical Plan ==
Relation [cust_id#67,name#68,age#69,gender#70,birthday#71,zip#72,city#73] parquet

== Analyzed Logical Plan ==
cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string
Relation [cust_id#67,name#68,age#69,gender#70,birthday#71,zip#72,city#73] parquet

== Optimized Logical Plan ==
Relation [cust_id#67,name#68,age#69,gender#70,birthday#71,zip#72,city#73] parquet

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [cust_id#67,name#68,age#69,gender#70,birthday#71,zip#72,city#73] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/afaqueahmad/Documents/youtube/spark-experiments/data/data_..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cust_id:string,name:string,age:string,gender:string,birthday:string,zip:string,city:string>



# Simple Operations
- `filter` rows where `city='boston'`
- `add` a new column: adding `first_name` and `last_name`
- `alter` an exisitng column: adding 5 to `age` column
- `select` relevant columns

### Why is a filter step present despite predicate pushdown? 

This is largely due to the way `Spark's Catalyst Optimizer` works. Specifically, it is due to two separate stages of the query optimization process: Physical Planning and Logical Planning.

- The **logical planning** phase is where Spark's Catalyst optimizer simplifies the logical plan (which represents the user's query) by applying various rule-based optimizations and transformations. This includes predicate pushdown, where filter conditions are moved as close to the data source as possible.

- The **physical planning** phase is where the logical plan is translated into one or more physical plans, which can actually be executed on the cluster. This includes operations like file `scans`, `filters`, `projections`, etc.

In this case, during the logical planning phase, the predicate (`F.col("city") == "boston"`) has been pushed down and will be applied during the scan of the Parquet file (`PushedFilters: [IsNotNull(city), EqualTo(city,boston)]`). This means that the Parquet reader will skip over any data blocks in the file that do not meet the filter condition, thus improving performance.

Then, during the physical planning phase, the same filter condition (`+- *(1) Filter (isnotnull(city#73) AND (city#73 = boston))`) is applied again to the data that's been loaded into memory. This is to ensure that only the required data is processed in the subsequent stages of the query.

It might seem **redundant**, but remember that not all data sources can handle pushed-down predicates, and not all predicates can be pushed down. Therefore, **even if a predicate is pushed down to the data source, Spark still includes the predicate in the physical plan** to cover cases where the data source might not have been able to fully apply the predicate. This is Spark's way of making sure the correct data is always returned, no matter the capabilities of the data source.

Moreover, predicate pushdown to the data source doesn't guarantee that all non-matching rows will be filtered out during the data source read. It just minimizes the amount of data that needs to be read. So, the Filter operation in Spark's physical plan ensures the correct application of the filter condition.

---

### Is the redundant filter still going to happen if predicate pushdown was successful? 

It is correct that if the data source is able to fully apply the filter predicates due to predicate pushdown, applying the filter again within Spark might seem **redundant**. However, there are some important points to consider:

1. **Guaranteed Correctness**: Not all data sources can fully handle all types of filters during predicate pushdown. By including the filter in the physical plan, Spark ensures that the correct data is returned even if the data source doesn't fully apply the filter. 

2. **Overhead is Minimal**: When predicate pushdown is successful, the filter operation in the physical plan typically operates on a significantly smaller dataset (since most of the unnecessary data has already been filtered out). So the overhead of this "redundant" filtering is usually quite small.

3. **No Assumptions**: Spark's Catalyst optimizer doesn't make assumptions about the data source's ability to handle pushed-down predicates. The optimizer aims to generate plans that return correct results across a wide range of scenarios. Even if the filter is pushed down, Spark does not have the feedback from data source whether the pushdown was successful or not, so it includes the filter operation in the physical plan as well.

To summarize, even though it might seem redundant to include the filter operation in the physical plan when predicates are pushed down, this design choice helps ensure the **correctness** of the query results across all scenarios and data sources. The overhead of this "redundant" operation is usually small, especially when predicate pushdown is successful. It is more of a **fail-safe mechanism** to ensure data integrity and correctness.

---

In [10]:
df_narrow_transform = (
    df_customers
    .filter(F.col("city") == "boston")
    .withColumn("first_name", F.split("name", " ").getItem(0))
    .withColumn("last_name", F.split("name", " ").getItem(1))
    .withColumn("age", F.col("age") + F.lit(5))
    .select("cust_id", "first_name", "last_name", "age", "gender", "birthday")
)

df_narrow_transform.show(5, False)
df_narrow_transform.explain(True)

+----------+----------+---------+----+------+---------+
|cust_id   |first_name|last_name|age |gender|birthday |
+----------+----------+---------+----+------+---------+
|C007YEYTX9|Aaron     |Abbott   |39.0|Female|7/13/1991|
|C08XAQUY73|Aaron     |Lambert  |59.0|Female|11/5/1966|
|C094P1VXF9|Aaron     |Lindsey  |29.0|Male  |9/21/1990|
|C097SHE1EF|Aaron     |Lopez    |27.0|Female|4/18/2001|
|C0DTC6436T|Aaron     |Schwartz |57.0|Female|7/9/1962 |
+----------+----------+---------+----+------+---------+
only showing top 5 rows

== Parsed Logical Plan ==
'Project ['cust_id, 'first_name, 'last_name, 'age, 'gender, 'birthday]
+- Project [cust_id#67, name#68, (cast(age#69 as double) + cast(5 as double)) AS age#129, gender#70, birthday#71, zip#72, city#73, first_name#110, last_name#119]
   +- Project [cust_id#67, name#68, age#69, gender#70, birthday#71, zip#72, city#73, first_name#110, split(name#68,  , -1)[1] AS last_name#119]
      +- Project [cust_id#67, name#68, age#69, gender#70, birthday#7

In [46]:
df_customers.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- city: string (nullable = true)



### In what cases will predicate pushdown not work?

Cases where filter pushdown may not work:

1. Using UDFs: Spark treats UDFs as black boxes and hence doesn't push down filters inside UDFs. 
2. Complex Data Types: Spark's Parquet data source does not push down filters that involve complex types, such as arrays, maps, and structs. This is because these complex data types can have complicated nested structures that the Parquet reader cannot easily filter on.

Here's an example:

Let's say you have a DataFrame with a column called `metadata`, and this column is a map type with keys and values that are strings. You might want to filter this DataFrame where the `metadata` map has a specific key-value pair:

```python
df.filter(df.metadata.getItem("key") == "value").show()
```

The `filter` operation in this case involves a complex data type (`MapType`). When you call `explain()` on this DataFrame, you would see that no filters have been pushed down to the data source:

```python
df.filter(df.metadata.getItem("key") == "value").explain()
```

You would see something like this in the output:

```
== Physical Plan ==
*(1) Filter (metadata#123[key] = value)
+- *(1) ColumnarToRow
   +- FileScan parquet [id#122,metadata#123] Batched: true, DataFilters: [(metadata#123[key] = value)], Format: Parquet, ...
```

In such cases, the filtering operation will be performed within Spark after the data is read into memory, which may increase memory usage and potentially impact performance.

------------------------------------------------

3. Unsupported Expressions: 

In Spark, `Parquet` data source does not support pushdown for filters involving a `.cast` operation. The reason is that casting changes the datatype of the column, and the Parquet data source may not be able to perform the filter operation correctly on the cast data.

You would see the filter operation on the cast column in the physical plan, but it wouldn't appear in the PushedFilters part of the FileScan operation. This indicates that the filter hasn't been pushed down to the Parquet data source. This is because the casting operation adds a layer of complexity that the Parquet data source can't handle for the purposes of filter pushdown.

So while Spark can still perform the filter operation correctly after reading the data into memory, the lack of filter pushdown means that it might need to read more data from the Parquet file than it would otherwise. This can impact performance.

Note: This behavior may vary based on the data source. For example, if you're working with a JDBC data source connected to a database that supports SQL-like operations, the `like("John%")` filter could potentially be pushed down to the database. This underlines the fact that what counts as an "unsupported expression" can depend on the specifics of the data source.

In [48]:
df_customer_gt_50 = (
    df_customers
    .filter(F.col("age").cast("int") > 50)
)
df_customer_gt_50.show(5, False)
df_customer_gt_50.explain(True)

+----------+--------------+---+------+----------+-----+------------+
|cust_id   |name          |age|gender|birthday  |zip  |city        |
+----------+--------------+---+------+----------+-----+------------+
|C01BKUFRHA|Aaron Becker  |54 |Male  |11/24/1979|40284|san_diego   |
|C01WMZQ7PN|Aaron Brady   |51 |Female|8/20/1994 |52204|philadelphia|
|C021567NJZ|Aaron Briggs  |57 |Male  |3/10/1990 |22008|philadelphia|
|C02JNTM46B|Aaron Chambers|51 |Male  |1/6/2001  |63337|new_york    |
|C030A69V1L|Aaron Clarke  |55 |Male  |4/28/1999 |77176|philadelphia|
+----------+--------------+---+------+----------+-----+------------+
only showing top 5 rows

== Parsed Logical Plan ==
'Filter (cast('age as int) > 50)
+- Relation [cust_id#67,name#68,age#69,gender#70,birthday#71,zip#72,city#73] parquet

== Analyzed Logical Plan ==
cust_id: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string
Filter (cast(age#69 as int) > 50)
+- Relation [cust_id#67,name#68,age#69,gend

# Repartition

In [37]:
df_transactions.rdd.getNumPartitions()

12

In [39]:
df_transactions.repartition(24).explain(True)

== Parsed Logical Plan ==
Repartition 24, true
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string
Repartition 24, true
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Repartition 24, true
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(24), REPARTITION_BY_NUM, [id=#412]
   +- FileScan parquet [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] Batched: true, DataFilters: [], Format: Parquet, Location: 

## Why doesn't `.coalesce()` explicitly show the partitioning scheme?

- The reason why the Spark query plan does not explicitly show the partitioning scheme, such as `RoundRobinPartitioning`, when you use `coalesce(3)`, is because the coalesce operation does not change the partitioning scheme.

- The operation essentially just minimizes data movement between existing partitions to merge them into a fewer number, instead of reshuffling all data. The partitioning scheme remains the same as the original DataFrame. Therefore, the Spark query plan does not include explicit information about the partitioning scheme, as it is unaffected by the `coalesce` operation.


In [40]:
df_transactions.coalesce(3).explain(True)

== Parsed Logical Plan ==
Repartition 3, false
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string
Repartition 3, false
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Repartition 3, false
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
Coalesce 3
+- *(1) ColumnarToRow
   +- FileScan parquet [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/afaqueahmad/Documents/youtube/spar

# Joins

In [14]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [15]:
df_joined = (
    df_transactions.join(
        df_customers,
        how="inner",
        on="cust_id"
    )
)

In [28]:
df_joined.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(cust_id))
:- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet
+- Relation [cust_id#67,name#68,age#69,gender#70,birthday#71,zip#72,city#73] parquet

== Analyzed Logical Plan ==
cust_id: string, start_date: string, end_date: string, txn_id: string, date: string, year: string, month: string, day: string, expense_type: string, amt: string, city: string, name: string, age: string, gender: string, birthday: string, zip: string, city: string
Project [cust_id#0, start_date#1, end_date#2, txn_id#3, date#4, year#5, month#6, day#7, expense_type#8, amt#9, city#10, name#68, age#69, gender#70, birthday#71, zip#72, city#73]
+- Join Inner, (cust_id#0 = cust_id#67)
   :- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet
   +- Relation [cust_id#67,name#68,age#69,gender#70,birthday#71,zip#72,city#73] parquet

== O

# GroupBy

In [18]:
df_transactions.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- txn_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- expense_type: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- city: string (nullable = true)



### GroupBy Count

In [29]:
df_city_counts = (
    df_transactions
    .groupBy("city")
    .count()
)

In [30]:
df_city_counts.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, count(1) AS count#998L]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
city: string, count: bigint
Aggregate [city#10], [city#10, count(1) AS count#998L]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Aggregate [city#10], [city#10, count(1) AS count#998L]
+- Project [city#10]
   +- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#10], functions=[count(1)], output=[city#10, count#998L])
   +- Exchange hashpartitioning(city#10, 200), ENSURE_REQUIREMENTS, [id=#330]
      +- HashAggregate(keys=[city#10], functions=[partial_count(1)], output=[city#10, count#1002L])
         +- Fil

### GroupBy Count Distinct 

In [33]:
df_txn_per_city = (
    df_transactions
    .groupBy("city")
    .agg(F.countDistinct("txn_id").alias("txn_count"))
)

In [34]:
df_txn_per_city.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, 'count(distinct 'txn_id) AS txn_count#1033]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
city: string, txn_count: bigint
Aggregate [city#10], [city#10, count(distinct txn_id#3) AS txn_count#1033L]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Aggregate [city#10], [city#10, count(distinct txn_id#3) AS txn_count#1033L]
+- Project [txn_id#3, city#10]
   +- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#10], functions=[count(distinct txn_id#3)], output=[city#10, txn_count#1033L])
   +- Exchange hashpartitioning(city#10, 200), ENSURE_REQUIREMENTS, [id=#376]
      +- HashAggre

In [35]:
df_txn_amt_city = (
    df_transactions
    .groupBy("city")
    .agg(F.sum("amt").alias("txn_amt"))
)

In [36]:
df_txn_amt_city.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, sum('amt) AS txn_amt#1053]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
city: string, txn_amt: double
Aggregate [city#10], [city#10, sum(cast(amt#9 as double)) AS txn_amt#1053]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Aggregate [city#10], [city#10, sum(cast(amt#9 as double)) AS txn_amt#1053]
+- Project [amt#9, city#10]
   +- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#10], functions=[sum(cast(amt#9 as double))], output=[city#10, txn_amt#1053])
   +- Exchange hashpartitioning(city#10, 200), ENSURE_REQUIREMENTS, [id=#389]
      +- HashAggregate(keys=[city#10], func