In [36]:
import warnings
warnings.filterwarnings("ignore")

In [37]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

In [3]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [4]:
#now setting up the spark session 
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext # to acess  RDD
sc.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/19 09:45:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
transaction_file = "/Users/bhushanchowdary/Documents/GitHub/pyspark/Optimization/data/data_skew/transactions.parquet"
df_transactions = spark.read.parquet(transaction_file)

In [6]:
df_transactions.show(5,False)

+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|cust_id   |start_date|end_date  |txn_id         |date      |year|month|day|expense_type |amt   |city       |
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
|C0YDPQWPBJ|2010-07-01|2018-12-01|TZ5SMKZY9S03OQJ|2018-10-07|2018|10   |7  |Entertainment|10.42 |boston     |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYIAPPNU066CJ5R|2016-03-27|2016|3    |27 |Motor/Travel |44.34 |portland   |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TETSXIK4BLXHJ6W|2011-04-11|2011|4    |11 |Entertainment|3.18  |chicago    |
|C0YDPQWPBJ|2010-07-01|2018-12-01|TQKL1QFJY3EM8LO|2018-02-22|2018|2    |22 |Groceries    |268.97|los_angeles|
|C0YDPQWPBJ|2010-07-01|2018-12-01|TYL6DFP09PPXMVB|2010-10-16|2010|10   |16 |Entertainment|2.66  |chicago    |
+----------+----------+----------+---------------+----------+----+-----+---+-------------+------+-----------+
only showi

In [7]:
customer_data = "/Users/bhushanchowdary/Documents/GitHub/pyspark/Optimization/data/data_skew/customers.parquet"
df_customer = spark.read.parquet(customer_data)

In [8]:
df_customer.show(5,False)

+----------+-------------+---+------+----------+-----+-----------+
|cust_id   |name         |age|gender|birthday  |zip  |city       |
+----------+-------------+---+------+----------+-----+-----------+
|C007YEYTX9|Aaron Abbott |34 |Female|7/13/1991 |97823|boston     |
|C00B971T1J|Aaron Austin |37 |Female|12/16/2004|30332|chicago    |
|C00WRSJF1Q|Aaron Barnes |29 |Female|3/11/1977 |23451|denver     |
|C01AZWQMF3|Aaron Barrett|31 |Male  |7/9/1998  |46613|los_angeles|
|C01BKUFRHA|Aaron Becker |54 |Male  |11/24/1979|40284|san_diego  |
+----------+-------------+---+------+----------+-----+-----------+
only showing top 5 rows


### Spark Query Plan?
![spark-execution](/Users/bhushanchowdary/Documents/GitHub/pyspark/Optimization/data/spark-execution.png)

* lets see trough this step by step first the parsed ast goes trough the syntactical check 
* after that it goes trough the metedata chechk with catalog store
* now our logical plan will be split in to multiple physical plan which will be judged based on the costmodel to get a final selected pysical plan to genrate the RDD

## Query Planning for Narrow Transformation
- `filter` rows where `city='boston'`
- `add` a new column: adding `first_name` and `last_name`
- `alter` an exisitng column: adding 5 to `age` column
- `select` relevant columns

In [9]:
df_narrow_transformation = (
    df_customer.filter(F.col("city")=="boston")
    .withColumn("First_name",F.split("name"," ")[0])
    .withColumn("Last_name",F.split("name"," ")[1])
    .withColumn("age",F.col("age")+F.lit(5))
    .select("cust_id","first_name","last_name","age","gender","birthday")
)
df_narrow_transformation.show(5,False)
df_narrow_transformation.explain()

+----------+----------+---------+---+------+---------+
|cust_id   |first_name|last_name|age|gender|birthday |
+----------+----------+---------+---+------+---------+
|C007YEYTX9|Aaron     |Abbott   |39 |Female|7/13/1991|
|C08XAQUY73|Aaron     |Lambert  |59 |Female|11/5/1966|
|C094P1VXF9|Aaron     |Lindsey  |29 |Male  |9/21/1990|
|C097SHE1EF|Aaron     |Lopez    |27 |Female|4/18/2001|
|C0DTC6436T|Aaron     |Schwartz |57 |Female|7/9/1962 |
+----------+----------+---------+---+------+---------+
only showing top 5 rows
== Physical Plan ==
*(1) Project [cust_id#46, split(name#47,  , -1)[0] AS first_name#76, split(name#47,  , -1)[1] AS last_name#77, (cast(age#48 as bigint) + 5) AS age#78L, gender#49, birthday#50]
+- *(1) Filter (isnotnull(city#52) AND (city#52 = boston))
   +- *(1) ColumnarToRow
      +- FileScan parquet [cust_id#46,name#47,age#48,gender#49,birthday#50,city#52] Batched: true, DataFilters: [isnotnull(city#52), (city#52 = boston)], Format: Parquet, Location: InMemoryFileIndex(1 

##### Analysis of the physical Plan
* for any transformation filescan is the first stage and also we have to keep in mind about the **DataFilter** at file scan stage
* next beacuse parque is in columanr format its converting into row for ease of operations
* applying our explicit filter 
* and project is where we do on in column operations 

### lets see how wide transformations work
1.repartitions
2.joins
3.joins
4.groupBY


In [10]:
df_transactions.rdd.getNumPartitions()

12

In [12]:
df_transactions.repartition(24).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(24), REPARTITION_BY_NUM, [plan_id=80]
   +- FileScan parquet [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/bhushanchowdary/Documents/GitHub/pyspark/Optimization/data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cust_id:string,start_date:string,end_date:string,txn_id:string,date:string,year:string,mon...




### analysis of the query plan
* it is scaning the parquet file as usual we are not seeing any datafilters and its reading whole schema beacuse we are reaptitioning the whole thing
* using RoundRobing partioon hash(key)%Number for shuffle partioons


In [15]:
df_transactions.rdd.getNumPartitions()#we are not storing it in any new data frame that why the transformation is not tobe seen

12

In [16]:
df_transactions.coalesce(5).explain()

== Physical Plan ==
Coalesce 5
+- *(1) ColumnarToRow
   +- FileScan parquet [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/bhushanchowdary/Documents/GitHub/pyspark/Optimization/data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cust_id:string,start_date:string,end_date:string,txn_id:string,date:string,year:string,mon...




## analysis of the pysical query plan
### Why doesn't `.coalesce()` explicitly show the partitioning scheme?

`.coalesce` doesn't show the partitioning scheme e.g. `RoundRobinPartitioning` because: 
- The operation only minimizes data movement by merging into fewer partitions, it doesn't do any shuffling.
- Because no shuffling is done, the partitioning scheme remains the same as the original DataFrame and Spark doesn't include it explicitly in it's plan as the partitioning scheme is unaffected by `.coalesce`

### lets see the query plan for joins


In [17]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [19]:
df_joined = (
df_transactions.join(
    df_customer,how="inner",on="cust_id"
)
)

In [21]:
df_joined.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [cust_id#0, start_date#1, end_date#2, txn_id#3, date#4, year#5, month#6, day#7, expense_type#8, amt#9, city#10, name#47, age#48, gender#49, birthday#50, zip#51, city#52]
   +- SortMergeJoin [cust_id#0], [cust_id#46], Inner
      :- Sort [cust_id#0 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(cust_id#0, 200), ENSURE_REQUIREMENTS, [plan_id=122]
      :     +- Filter isnotnull(cust_id#0)
      :        +- FileScan parquet [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] Batched: true, DataFilters: [isnotnull(cust_id#0)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/bhushanchowdary/Documents/GitHub/pyspark/Optimization/data..., PartitionFilters: [], PushedFilters: [IsNotNull(cust_id)], ReadSchema: struct<cust_id:string,start_date:string,end_date:string,txn_id:string,date:string,year:string,mon...
      +- Sort [cust_id#46 ASC NULLS

### analysis 
*before the hashpartiton spark engine cheks 2 times filter the hasesh and sorts and dose the same thing for next


In [22]:
df_transactions.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- txn_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- expense_type: string (nullable = true)
 |-- amt: string (nullable = true)
 |-- city: string (nullable = true)



In [23]:
df_city_counts = (
    df_transactions
    .groupBy("city")
    .count()
)

In [24]:
df_city_counts.explain(True)

== Parsed Logical Plan ==
'Aggregate ['city], ['city, 'count(1) AS count#98]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Analyzed Logical Plan ==
city: string, count: bigint
Aggregate [city#10], [city#10, count(1) AS count#98L]
+- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Optimized Logical Plan ==
Aggregate [city#10], [city#10, count(1) AS count#98L]
+- Project [city#10]
   +- Relation [cust_id#0,start_date#1,end_date#2,txn_id#3,date#4,year#5,month#6,day#7,expense_type#8,amt#9,city#10] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[city#10], functions=[count(1)], output=[city#10, count#98L])
   +- Exchange hashpartitioning(city#10, 200), ENSURE_REQUIREMENTS, [plan_id=139]
      +- HashAggregate(keys=[city#10], functions=[partial_count(1)], output=[city#10, count#112L])
         +- File

#### Why the Explicit Filter Step is present despite predicate push down
### Why is a filter step present despite predicate pushdown? 

This is largely due to the way `Spark's Catalyst Optimizer` works. Specifically, due to two separate stages of the query optimization process: Physical Planning and Logical Planning.

- **Logical Planning**: Catalyst optimizer simplifies the unresolved logical plan (which represents the user's query) by applying various rule-based optimizations. This includes `predicate pushdown`, `projection pushdown` where filter conditions and column projections are moved as close to the data source as possible.

- **Physical Planning** phase is where the logical plan is translated into one or more physical plans, which can actually be executed on the cluster. This includes operations like file `scans`, `filters`, `projections`, etc.

In this case, during the logical planning phase, the predicate (`F.col("city") == "boston"`) has been pushed down and will be applied during the scan of the Parquet file (`PushedFilters: [IsNotNull(city), EqualTo(city,boston)]`), thus improving performance.

Now, during the physical planning phase, the same filter condition (`+- *(1) Filter (isnotnull(city#73) AND (city#73 = boston))`) is applied again to the data that's been loaded into memory. This is because of the following reasons:

1. **Guaranteed Correctness:** It might seem **redundant**, but remember that not all data sources can handle pushed-down predicates, and not all predicates can be pushed down. Therefore, **even if a predicate is pushed down to the data source, Spark still includes the predicate in the physical plan** to cover cases where the data source might not have been able to fully apply the predicate. This is Spark's way of making sure the correct data is always returned, no matter the capabilities of the data source.

2. **No Assumptions**: Spark's Catalyst optimizer doesn't make assumptions about the data source's ability to handle pushed-down predicates. The optimizer aims to generate plans that return correct results across a wide range of scenarios. Even if the filter is pushed down, Spark does not have the feedback from data source whether the pushdown was successful or not, so it includes the filter operation in the physical plan as well.

It is more of a **fail-safe mechanism** to ensure data **integrity** and **correctness**.

---

### In what cases will predicate pushdown not work?

2 Examples where **filter pushdown** will not work:

1. **Complex Data Types**: Spark's Parquet data source does not push down filters that involve **complex types**, such as **arrays**, **maps**, and **structs**. This is because these complex data types can have complicated nested structures that the Parquet reader cannot easily filter on.

Here's an example:

```
root
 |-- Name: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------+-----------------------------+
|Name      |properties                   |
+----------+-----------------------------+
|Afaque    |[eye -> black, hair -> black]|
|Naved     |[eye ->, hair -> brown]      |
|Ali       |[eye -> black, hair -> red]  |
|Amaan     |[eye -> grey, hair -> grey]  |
|Omaira    |[eye -> , hair -> brown]     |
+----------+-----------------------------+
```

```python
df.filter(df.properties.getItem("eye") == "brown").show()
```

```
== Physical Plan ==
*(1) Filter (metadata#123[key] = value)
+- *(1) ColumnarToRow
   +- FileScan parquet [id#122,metadata#123] Batched: true, DataFilters: [(metadata#123[key] = value)], Format: Parquet, ...
```

------------------------------------------------

3. Unsupported Expressions: 

In Spark, `Parquet` data source does not support pushdown for filters involving a `.cast` operation. The reason for this behaviour is as follows:
- `.cast` changes the datatype of the column, and the Parquet data source may not be able to perform the filter operation correctly on the cast data.

**Note**: This behavior may vary based on the data source. For example, if you're working with a JDBC data source connected to a database that supports SQL-like operations, the `.cast` filter could potentially be pushed down to the database.

In [25]:
spark.stop()