## Turn off AQE

In [0]:
spark.conf.set("spark.sql.adaptive.enabled", "false")

## checking AQE status

In [0]:
spark.conf.get("spark.sql.adaptive.enabled")

Out[2]: 'false'

## Read data from the DBFS

In [0]:
from pyspark.sql.functions import *

In [0]:
df = spark.read.format('csv')\
        .option('inferSchema',True)\
            .option('header', True)\
                .load("dbfs:/FileStore/tables/bigmart_sales.csv")

In [0]:
df.display()

## 128mb is the default size. 

In [0]:
df.rdd.getNumPartitions()

Out[5]: 1

## Changing default partition size to 128kb

In [0]:
spark.conf.set("spark.sql.files.maxPartitionBytes", 131072)

#### no of partitions = size of file / partition size
#### no of partitions = 874kb / 128kb = 7

In [0]:
df.rdd.getNumPartitions()

Out[8]: 7

## Changing the default partition  size to 128mb

In [0]:
spark.conf.set("spark.sql.files.maxPartitionBytes", 134217728)

## Repartitioning

In [0]:
df = df.repartition(10)

In [0]:
df.rdd.getNumPartitions()

Out[11]: 10

## if have 4 core in executor then you can do 4 tasks at a time. 
## task is what partition we provide to do

## Get Partition Info
#### By this you will get to know which partition holding which data

In [0]:
df = df.withColumn("partition_id", spark_partition_id())
df.display()

## Data Writing

In [0]:
df.write.format('parquet')\
    .mode('append')\
        .option("path","/FileStore/tables/parquetWrite")\
            .save()

### in the above we have 10 partition.

## new data reading

In [0]:
df_new = spark.read.format("parquet")\
            .load("/FileStore/tables/parquetWrite")

### Here it will all the 10 files to get the whole data

In [0]:
df_new.display()

### Now we have to read the data by using the filter condition. we think it will read the data from particular file that contains the filter condition. but no it will read all the 10 files bcz spark doesn't know where the "Tier 1" data is stored in which partition.

In [0]:
df_new = spark.read.format("parquet")\
            .load("/FileStore/tables/parquetWrite")
df_new = df_new.filter(col("Outlet_Location_Type") == "Tier 1")

In [0]:
df_new.display()

# Scanning Optimization
### Here we can pruning the data based on the prunning condition to stop reading all the files and get the data from those partition where "Tier 1" is stored

In [0]:
df.write.format("parquet")\
            .mode("append")\
                .partitionBy('Outlet_Location_Type')\
                    .option("path","/FileStore/tables/parquetWriteOpt")\
                        .save()

#### in the above code it will create the folder based the distinct Outlet_Location_Type 
#### in the below it will go to the Tier 1 mention folder and read those data only

In [0]:
df_new = spark.read.format("parquet")\
            .load("/FileStore/tables/parquetWriteOpt")
df_new = df_new.filter(col("Outlet_Location_Type") == 'Tier 1')
df_new.display()

# Joins Optimization

#### 4 Executors are there.
#### Fact- 1GB--> 8Partitions
#### Dim- 1mb--> 1partition
#### if the AQE is not enabled then we will get 200 partitions
#### joins are wide transformation. Here we perform Shuffling


#### If one table is 1GB and other is very less (1-10mb), Here we don't need 200 partitions and shuffling
#### First Driver node will store the small file to it's JVM memory then it broadcast the small table to each node and save in executors memory.

In [0]:
# Big DataFrame 
df_transactions = spark.createDataFrame([ 
    (1, "US", 100), 
    (2, "IN", 200), 
    (3, "UK", 150), 
    (4, "US", 80), 
], ["id", "country_code", "amount"])

# Small DataFrame 
df_countries = spark.createDataFrame([ 
    ("US", "United States"), 
    ("IN", "India"), 
    ("UK", "United Kingdom"), 
], ["country_code", "country_name"])

In [0]:
df_transactions.display()

id,country_code,amount
1,US,100
2,IN,200
3,UK,150
4,US,80


In [0]:
df_countries.display()

country_code,country_name
US,United States
IN,India
UK,United Kingdom


##### Normal Inner join where it creates 200 partition

In [0]:
df_join = df_transactions.join(df_countries, df_transactions['country_code']== df_countries['country_code'], "inner")

In [0]:
df_join.display()

id,country_code,amount,country_code.1,country_name
1,US,100,US,United States
4,US,80,US,United States
2,IN,200,IN,India
3,UK,150,UK,United Kingdom


#### Broadcast join

In [0]:
df_join_opt = df_transactions.join(broadcast(df_countries), df_transactions['country_code']== df_countries['country_code'], "inner")

In [0]:
df_join_opt.display()

id,country_code,amount,country_code.1,country_name
1,US,100,US,United States
2,IN,200,IN,India
3,UK,150,UK,United Kingdom
4,US,80,US,United States


## Using SQL

In [0]:
# Big DataFrame 
df_transactions = spark.createDataFrame([ 
    (1, "US", 100), 
    (2, "IN", 200), 
    (3, "UK", 150), 
    (4, "US", 80), 
], ["id", "country_code", "amount"])

# Small DataFrame 
df_countries = spark.createDataFrame([ 
    ("US", "United States"), 
    ("IN", "India"), 
    ("UK", "United Kingdom"), 
], ["country_code", "country_name"])

In [0]:
df_transactions.createOrReplaceTempView('trans')
df_countries.createOrReplaceTempView('country')

In [0]:
df_sql =  spark.sql('''select * from trans t
join country c on t.country_code = t.country_code''')

In [0]:
df_sql.display()

id,country_code,amount,country_code.1,country_name
1,US,100,US,United States
1,US,100,IN,India
1,US,100,UK,United Kingdom
2,IN,200,US,United States
2,IN,200,IN,India
2,IN,200,UK,United Kingdom
3,UK,150,US,United States
3,UK,150,IN,India
3,UK,150,UK,United Kingdom
4,US,80,US,United States


## join optimization using SQL HINTs
##### Sometimes it apply sometimes not

In [0]:
df_sql_opt =  spark.sql('''select  /* broadcast(country) */
                        *
from trans t
join country c 
on t.country_code = t.country_code''')

In [0]:
df_sql_opt.display()

id,country_code,amount,country_code.1,country_name
1,US,100,US,United States
1,US,100,IN,India
1,US,100,UK,United Kingdom
2,IN,200,US,United States
2,IN,200,IN,India
2,IN,200,UK,United Kingdom
3,UK,150,US,United States
3,UK,150,IN,India
3,UK,150,UK,United Kingdom
4,US,80,US,United States


# Caching and Persistence

## Spark Pool Memory
#### 1. Storage memory
#### 2. Executor memory(all the df and calculation is done here)

### DISK_AND_MEMORY
### DISK_ONLY
### MEMORY_ONLY

##### df.persist(storageLevel.DISK_AND_MEMORY) --> df.cache()

In [0]:
from pyspark.sql.functions import *

In [0]:
df = spark.read.format('csv')\
        .option('inferSchema', True)\
            .option('header',True)\
                .load("/FileStore/tables/bigmart_sales.csv")\
                    .cache()

In [0]:
df2 = df.filter(col("Outlet_Location_Type") == 'Tier 1')

In [0]:
df3 = df.filter(col("Outlet_Location_Type") == 'Tier 2')

In [0]:
df3.display()

##### To remove Cache data

In [0]:
df.unpersist()

Out[12]: DataFrame[Item_Identifier: string, Item_Weight: double, Item_Fat_Content: string, Item_Visibility: double, Item_Type: string, Item_MRP: double, Outlet_Identifier: string, Outlet_Establishment_Year: int, Outlet_Size: string, Outlet_Location_Type: string, Outlet_Type: string, Item_Outlet_Sales: double]

In [0]:
from pyspark.storagelevel import StorageLevel

In [0]:
df.persist(StorageLevel.MEMORY_ONLY)

Out[14]: DataFrame[Item_Identifier: string, Item_Weight: double, Item_Fat_Content: string, Item_Visibility: double, Item_Type: string, Item_MRP: double, Outlet_Identifier: string, Outlet_Establishment_Year: int, Outlet_Size: string, Outlet_Location_Type: string, Outlet_Type: string, Item_Outlet_Sales: double]

## Dynamic Resource Allocation

In [0]:
spark-submit \
    --conf spark.dynamicAllocation.enabled=true \
    --conf spark.shuffle.service.enabled=true \
    --conf spark.dynamicAllocation.minExecutors=2 \
    --conf spark.dynamicAllocation.initialExecutors=4 \
    --conf spark.dynamicAllocation.maxExecutors=10 \
    my_spark_job.py

# AQE (Adaptive Query Execution)

#### - Dynamically Coalesce the partitions
#### - Optimizing the join the strategy during runtime
#### - Optimize the skewness

In [0]:
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [0]:
spark.conf.get("spark.sql.adaptive.enabled")

Out[2]: 'false'

In [0]:
from pyspark.sql.functions import *

In [0]:
df = spark.read.format('csv')\
            .option('inferSchema', True)\
                .option('header',True)\
                    .load("/FileStore/bigmart_sales.csv")

In [0]:
df.rdd.getNumPartitions()

Out[5]: 1

In [0]:
df_new = df.groupBy("Item_Fat_Content").count()

In [0]:
df_new.display()

Item_Fat_Content,count
low fat,112
Low Fat,5089
LF,316
Regular,2889
reg,117


In [0]:
df_new.rdd.getNumPartitions()

Out[11]: 200

### in the above wide transformation spark will create the default partition as 200
### Let's enable the AQE

In [0]:
spark.conf.set("spark.sql.adaptive.enabled", "true")

In [0]:
spark.conf.get("spark.sql.adaptive.enabled")

Out[10]: 'true'

In [0]:
df = spark.read.format('csv')\
            .option('inferSchema', True)\
                .option('header',True)\
                    .load("/FileStore/bigmart_sales.csv")

In [0]:
df.rdd.getNumPartitions()

Out[13]: 1

#### in the above it coalesce the 200 partitions to 1 partition bcz here data size is very small
#### in the case of join AQE will use broadcast join automatically if one table is large and one is small.

# Dynamic Partition Pruning

### df1(3 partitions) and df2(1 partition)
#### df1.filter("col1") then join(), here it will directly read the data from the file where is partitioned on the basis of the pruning condition.
#### df2.filter("col1") then join(), Here it will take this partition filter that is applied on df2 and it will pass this filter to df1 as well during the runtime. it will simply broadcast it to df1

##### Turning off AQE, Dynamic partition pruning and AutoBroadcast

In [0]:
spark.conf.set("spark.sql.adaptive.enabled","false")
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled","false")
spark.conf.set("park.sql.autoBroadcastJoinThresold", -1)

In [0]:
df = spark.read.format('csv')\
            .option('inferSchema', True)\
                .option('header',True)\
                    .load("/FileStore/bigmart_sales.csv")
df = df.limit(8)

In [0]:
df.display()

Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,Item_Outlet_Sales
FDA15,9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138
DRC01,5.92,Regular,0.019278216,Soft Drinks,48.2692,OUT018,2009,Medium,Tier 3,Supermarket Type2,443.4228
FDN15,17.5,Low Fat,0.016760075,Meat,141.618,OUT049,1999,Medium,Tier 1,Supermarket Type1,2097.27
FDX07,19.2,Regular,0.0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38
NCD19,8.93,Low Fat,0.0,Household,53.8614,OUT013,1987,High,Tier 3,Supermarket Type1,994.7052
FDP36,10.395,Regular,0.0,Baking Goods,51.4008,OUT018,2009,Medium,Tier 3,Supermarket Type2,556.6088
FDO10,13.65,Regular,0.012741089,Snack Foods,57.6588,OUT013,1987,High,Tier 3,Supermarket Type1,343.5528
FDP10,,Low Fat,0.127469857,Snack Foods,107.7622,OUT027,1985,Medium,Tier 3,Supermarket Type3,4022.7636


#### Preparing the partitioned Data

In [0]:
df.write.format("parquet")\
        .mode("append")\
            .partitionBy("Outlet_Type")\
                .option("path","/FileStore/DPP/bigmart_sales")\
                    .save()

#### Non partitioned data


In [0]:
df.write.format("parquet")\
        .mode("append")\
                .option("path","/FileStore/DPP/NonPartitionedbigmart_sales")\
                    .save()

#### Dataframes

In [0]:
df1 = spark.read.format("parquet")\
                .load("/FileStore/DPP/bigmart_sales")

In [0]:
df2 = spark.read.format("parquet")\
                .load("/FileStore/DPP/NonPartitionedbigmart_sales")

#### Joines

In [0]:
df_join = df1.join(df2.filter(col("Outlet_Type") == "Grocery Store"), df1['Item_Identifier'] == df2["Item_Identifier"], 'inner')

In [0]:
df_join.display()

Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Item_Outlet_Sales,Outlet_Type,Item_Identifier.1,Item_Weight.1,Item_Fat_Content.1,Item_Visibility.1,Item_Type.1,Item_MRP.1,Outlet_Identifier.1,Outlet_Establishment_Year.1,Outlet_Size.1,Outlet_Location_Type.1,Outlet_Type.1,Item_Outlet_Sales.1
FDX07,19.2,Regular,0.0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,732.38,Grocery Store,FDX07,19.2,Regular,0.0,Fruits and Vegetables,182.095,OUT010,1998,,Tier 3,Grocery Store,732.38


In [0]:
df_join.rdd.getNumPartitions()

Out[35]: 4

#### In the above join it has read the 4 partitions and now let's enabled the dynamic Partition Pruning

In [0]:
spark.conf.set("spark.sql.adaptive.enabled","false")
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled","true")
spark.conf.set("park.sql.autoBroadcastJoinThresold", 5 * 1024 * 1024)

In [0]:
df = spark.read.format('csv')\
            .option('inferSchema', True)\
                .option('header',True)\
                    .load("/FileStore/bigmart_sales.csv")
df = df.limit(8)

In [0]:
df.write.format("parquet")\
        .mode("append")\
            .partitionBy("Item_Identifier")\
                .option("path","/FileStore/DPP/bigmart_sales1")\
                    .save()

In [0]:
df.write.format("parquet")\
        .mode("append")\
                .option("path","/FileStore/DPP/NonPartitionedbigmart_sales1")\
                    .save()

In [0]:
df1 = spark.read.format("parquet")\
                .load("/FileStore/DPP/bigmart_sales1")

In [0]:
df2 = spark.read.format("parquet")\
                .load("/FileStore/DPP/NonPartitionedbigmart_sales1")

In [0]:
df_join = df1.join(df2.filter(col("Item_Identifier") == "FDA15"), df1['Item_Identifier'] == df2["Item_Identifier"], 'inner')

In [0]:
df_join.display()

Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,Item_Outlet_Sales,Item_Identifier,Item_Identifier.1,Item_Weight.1,Item_Fat_Content.1,Item_Visibility.1,Item_Type.1,Item_MRP.1,Outlet_Identifier.1,Outlet_Establishment_Year.1,Outlet_Size.1,Outlet_Location_Type.1,Outlet_Type.1,Item_Outlet_Sales.1
9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138,FDA15,FDA15,9.3,Low Fat,0.016047301,Dairy,249.8092,OUT049,1999,Medium,Tier 1,Supermarket Type1,3735.138


#### Now it has read the 1 partition/file data

In [0]:
df_join.rdd.getNumPartitions()

Out[48]: 1

# Broadcast Variable

#### Here all the values are being used from the executor's memory instead of sending again and again we ahve broadcast it. So it will be there till the application completes or till the application ends

In [0]:
from pyspark.sql.functions import *

In [0]:
df = spark.createDataFrame(
    [
        ("1001",),
        ("1002",),
        ("1004",)
    ], ["product_id"]
)

product_dict = {
    "1001": "iphone",
    "1002": "Samsung",
    "1003": "Pixel"
}

In [0]:
# Broadcasting the dictionary Variable
broad_vr = spark.sparkContext.broadcast(product_dict)

In [0]:
broad_vr.value

Out[6]: {'1001': 'iphone', '1002': 'Samsung', '1003': 'Pixel'}

In [0]:
def mymap(x):
    return broad_vr.value.get(x)

In [0]:
mymap_udf = udf(mymap)

In [0]:
df_with_names = df.withColumn("product_name", mymap_udf("product_id"))
df_with_names.display()

product_id,product_name
1001,iphone
1002,Samsung
1004,


# Salting -- OOM(Out Of Memory)

##### Suppose you a grocery store data and customer -> A, it is having 80% of your data and you have using a wide transformation(groupBy). afte that it will partition the data according to the Customer id and customer-> A would be the biggest partition of 1.5 GB and your executor memory is 1GB. So it will not fit in the executor memory.

##### In Salting we have to break the big partion into the small small partition based on some random number.

In [0]:
data = [("A", 100), ("A", 200), ("A", 300), ("B", 400), ("C", 500)]
df = spark.createDataFrame(data,["user_id","purchase"])

In [0]:
df.display()

user_id,purchase
A,100
A,200
A,300
B,400
C,500


##### Adding Salt Column

In [0]:
df = df.withColumn("salt_col", floor(rand()*3))

In [0]:
df.display()

user_id,purchase,salt_col
A,100,0
A,200,0
A,300,2
B,400,2
C,500,0


##### Creating concat col on original grouby col and salt col to create a new grouby col

In [0]:
df = df.withColumn("user_id_salt", concat(col("user_id"),lit("-"),col("salt_col")))
df.display()

user_id,purchase,salt_col,user_id_salt
A,100,0,A-0
A,200,0,A-0
A,300,2,A-2
B,400,2,B-2
C,500,0,C-0


##### Applying GroupBy on new col

In [0]:
df = df.groupBy("user_id_salt").agg(sum("purchase"))
df.display()

user_id_salt,sum(purchase)
A-0,300
A-2,300
B-2,400
C-0,500


# Delta Lake Optimization

##### Here we are using the optimization techniques in the disk level not in the memory level