In [0]:
loony_user_credentials = spark.read.format("csv").option("inferSchema", True).option("header", True).option("sep", ",").load()

loony_user_credentials.display()
"""
Output
------

User name | Password | Access key ID | Secret access key | Console login link

"""

In [0]:
access_key = loony_user_credentials.where(col('User name') == 'loony_user').select('Access key ID').collect()[0]['Access key ID']

secret_key = loony_user_credentials.where(col('User name') == 'loony_user').select('Secret access key').collect()[0]['Secret access key']

print('Access key', access_key)
print('Secret key', secret_key)

In [0]:
encoded_secret_key = urllib.parse.quote(secret_key, "")

print('Encoded secret key', encoded_secret_key)

In [0]:
aws_s3_bucket = "atl-mgmt-de-dev"

mount_name = "/mnt/atl-mgmt-de-dev/rshukla"

#sourceURI = "s3n://{0}:{1}@{2}".format(access_key, encoded_secret_key, aws_s3_bucket)
sourceURI = "s3://atl-mgmt-de-dev/rshukla/OASonD/"

dbutils.fs.mount(sourceURI, mount_name)

In [0]:
%fs

ls "/mnt/rshukla/databricks_files"

In [0]:
video_games_data = spark.readStream.format("cloudFiles") \
                        .option("cloudFiles.format", "csv")\
                        .option("inferSchema", "true")\
                        .option("cloudFiles.schemaLocation", "dbfs:/FileStore/schema/video_games_schema")\
                        .option("cloudFiles.schemaHints", "Year_of_Release int")
                        .load("dbfs://mnt/atl-mgmt-de-dev/rshukla/databricks_files/*")
  
video_games_data.display()

In [0]:
%fs

ls /mnt/atl-mgmt-de-dev/


## Diagnosing and Mitigating Performance Problems


Performnace bottlenecks : could occur coz of below issues  
* Serialization
* Skew  
* Spill
* Shuffle
* Memory


##### Serialzation  
* All data sent to ver the network or written to disk is serialized
* Data stored in memory may also be stored in serialized form
* The default Java serializer has mediocre performance
* The Kyro serializer has benn show to work 10x faster than JAVA

we might want to store data using efficient data structures, which can help make serialization faster  
Efficient Data Structures ::  
- Using more efficient data structures can help make serialization faster
- Prfer simpler data structures  
  -- User primitive data types  
  -- Use arrays rather rather than containers

Broadcast Variables :: Make sure to use shared read-only broadcast variables where possible
* Processing functions in Spark carry around copies of all variables that it references using "Closures"
* 1 copy per task, all copying from master node - several task may run on the same worker node; each task will have own copy of variables
* Broadcast variables are shared, read-only variables
* Only one copy per node, not one per task



##### Skew
By default Spark creates partitions which are 128MB in size- this ensures even distribution of data.  
Transformations may change the partitions such that there are significantly more records in one partition.  
This uneven distribution of records in partitions is called "SKEW"

* A certain amount of skew in your partition sized can be ignored
* Large skews can result in spills or out-of-memory errors
* The time taken to execute a stage will be as long as the longest running task
* Large partitions may not have enough RAM memory for processing, and this what results in spills to disk

###### Mitigating Skew  
-- Enable adaptive query execution (Spark 3.x)  
-- Use skew hints to help Spark optimize queries  
-- Salt the skewed column with a random number to create a better distribution of data  


##### Spill  
Refers to the act of moving data from memory to disk, and then back again to memory
* For large size partitions the data may not fit in memory
* The data is spilled to disk(written to disk and then read back again)
###### Mitigating Spills
-- Allocate more memory to cluster machines
-- Mitigate skew that causes spills
-- Work with smaller partition sizes by increasing the number of partitions


##### Shuffle
A shuffle often occurs when we perform "wide transformation" on our data.  
Often referred to a shuffle where Spark will exchange partitions across the cluster. Shuffle requires Spark to write results to disk, operations are not in-memory.
* Side effect of a wide transformation  
  --Wide Transformation : A single input partition contribuers to many output partitions
* Aggregations and joins
* Shuffles require expensive writes to disk and network I/O
###### Mitigating Shuffle
* Reduce network I/O with fewer larger workers
* Reduce data processed by filtering data, removing unnecessary columns
* Denormalize the data(in case of joins)
* Pre-shuffle the data for joins using bucketing

##### Memory
* Memory for cashing data
  -- RDDs are stored here by default
* Memory for shuffles
  -- Data is buffered when transferring to other machines
* Memory for tasks
  -- Heap space for computations
  
###### Mitigating Memory
* Allocate memory based on type of job
* Shuffle intensive jobs need more shuffle memory  
  -- Large joins but few computations
* Computation intensive jobs need more cache memory  
  -- Machine Learning algorithms


##### Memory Partitions vs Disk Partitions
Partitions are basic units of parallelism, every Spark preocess operates on data in a single partition
* Memory Partitions - allow for parallel processing on large datasets
* Disk Partitions - Write data out to disks in nested folders. So that related data is grouped together into a single folder. often partition in memory first and then write your data out.  
  -- Disk partitiong helps reduce disk reads and writes for certain operations


##### Disk Partitioning 


In [0]:
from pyspark.sql.functions import col

olympics_data = spark.read.format("csv").option("header", True).option("inferSchema", True).load("s3://atl-mgmt-de-dev/rshukla/OASonD/summer.csv")

olympics_data.display()

Year,City,Sport,Discipline,Athlete,Country,Gender,Event,Medal
1896,Athens,Aquatics,Swimming,"HAJOS, Alfred",HUN,Men,100M Freestyle,Gold
1896,Athens,Aquatics,Swimming,"HERSCHMANN, Otto",AUT,Men,100M Freestyle,Silver
1896,Athens,Aquatics,Swimming,"DRIVAS, Dimitrios",GRE,Men,100M Freestyle For Sailors,Bronze
1896,Athens,Aquatics,Swimming,"MALOKINIS, Ioannis",GRE,Men,100M Freestyle For Sailors,Gold
1896,Athens,Aquatics,Swimming,"CHASAPIS, Spiridon",GRE,Men,100M Freestyle For Sailors,Silver
1896,Athens,Aquatics,Swimming,"CHOROPHAS, Efstathios",GRE,Men,1200M Freestyle,Bronze
1896,Athens,Aquatics,Swimming,"HAJOS, Alfred",HUN,Men,1200M Freestyle,Gold
1896,Athens,Aquatics,Swimming,"ANDREOU, Joannis",GRE,Men,1200M Freestyle,Silver
1896,Athens,Aquatics,Swimming,"CHOROPHAS, Efstathios",GRE,Men,400M Freestyle,Bronze
1896,Athens,Aquatics,Swimming,"NEUMANN, Paul",AUT,Men,400M Freestyle,Gold


In [0]:
spark.conf.get("spark.sql.adaptive.enabled")

In [0]:
#coz we will repartition DataFrame based on a column, turning it off so that repartitioning in memory works, else spark will figure out repartition not needed for such small dataset
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.get("spark.sql.adaptive.enabled")

In [0]:
# This olympic dataset is not partitioned on disk
olympics_data_filtered = olympics_data.filter('Country = "POL"')
olympics_data_filtered.explain()

# here it required to scan entire dataset on disk

In [0]:
olympics_data_filtered.display()

#Check Spark Jobs -> View(stages) -> SQL Tab -> olympics_data_filtered.display() in completed queries -> Expand all the details in the query plan visualization -> size of files read | 2.5 MiB 
#======> entire file had to read in order to perform above filtering operation (For larger files, this can be a significant performance issue)

Year,City,Sport,Discipline,Athlete,Country,Gender,Event,Medal
1924,Paris,Cycling,Cycling Track,"LANGE, Jozef",POL,Men,Team Pursuit (4000M),Silver
1924,Paris,Cycling,Cycling Track,"LAZARSKI, Jan",POL,Men,Team Pursuit (4000M),Silver
1924,Paris,Cycling,Cycling Track,"STANKIEWICZ, Tomasz",POL,Men,Team Pursuit (4000M),Silver
1924,Paris,Cycling,Cycling Track,"SZYMCZYK, Franciszek",POL,Men,Team Pursuit (4000M),Silver
1924,Paris,Equestrian,Jumping,"KROLIKIEWICZ, Adam",POL,Men,Individual,Bronze
1928,Amsterdam,Athletics,Athletics,"KONOPACKA, Halina",POL,Women,Discus Throw,Gold
1928,Amsterdam,Equestrian,Eventing,"ANTONIEWICZ, Michal",POL,Men,Team,Bronze
1928,Amsterdam,Equestrian,Eventing,"DE ROMMEL (BARON), Karol",POL,Men,Team,Bronze
1928,Amsterdam,Equestrian,Eventing,"TRENKWALD, Jozef",POL,Men,Team,Bronze
1928,Amsterdam,Equestrian,Jumping,"ANTONIEWICZ, Michal",POL,Men,Team,Silver


In [0]:
olympics_data.repartition(col("Country")).write.mode("overwrite").parquet("s3://atl-mgmt-de-dev/rshukla/OASonD/olympics_data_repartitioned")

In [0]:
dbutils.fs.ls("s3://atl-mgmt-de-dev/rshukla/OASonD/olympics_data_repartitioned")

In [0]:
repartitioned_data = spark.read.parquet("s3://atl-mgmt-de-dev/rshukla/OASonD/olympics_data_repartitioned")
repartitioned_data.display()

Year,City,Sport,Discipline,Athlete,Country,Gender,Event,Medal
1956,Melbourne / Stockholm,Aquatics,Swimming,"TEN ELSEN, Eva-Maria",EUA,Women,200M Breaststroke,Bronze
1956,Melbourne / Stockholm,Aquatics,Swimming,"HAPPE-KREY, Ursula",EUA,Women,200M Breaststroke,Gold
1956,Melbourne / Stockholm,Athletics,Athletics,"STUBNICK, Christa",EUA,Women,100M,Silver
1956,Melbourne / Stockholm,Athletics,Athletics,"RICHTZENHAIN, Klaus",EUA,Men,1500M,Silver
1956,Melbourne / Stockholm,Athletics,Athletics,"STUBNICK, Christa",EUA,Women,200M,Silver
1956,Melbourne / Stockholm,Athletics,Athletics,"HAAS, Karl-Friedrich",EUA,Men,400M,Silver
1956,Melbourne / Stockholm,Athletics,Athletics,"FÜTTERER, Heinz",EUA,Men,4X100M Relay,Bronze
1956,Melbourne / Stockholm,Athletics,Athletics,"GERMAR, Manfred",EUA,Men,4X100M Relay,Bronze
1956,Melbourne / Stockholm,Athletics,Athletics,"KNÖRZER, Lothar",EUA,Men,4X100M Relay,Bronze
1956,Melbourne / Stockholm,Athletics,Athletics,"POHL, Leonard (Leo)",EUA,Men,4X100M Relay,Bronze


In [0]:
repartitioned_data_filtered = repartitioned_data.filter('Country = "POL"')
repartitioned_data_filtered.explain()

In [0]:
repartitioned_data_filtered.display()
#size of files read	777.2 KiB | now the size of reading the file is reduced to 777Kib instead whole file of size 2.5 MB

Year,City,Sport,Discipline,Athlete,Country,Gender,Event,Medal
1924,Paris,Cycling,Cycling Track,"LANGE, Jozef",POL,Men,Team Pursuit (4000M),Silver
1924,Paris,Cycling,Cycling Track,"LAZARSKI, Jan",POL,Men,Team Pursuit (4000M),Silver
1924,Paris,Cycling,Cycling Track,"STANKIEWICZ, Tomasz",POL,Men,Team Pursuit (4000M),Silver
1924,Paris,Cycling,Cycling Track,"SZYMCZYK, Franciszek",POL,Men,Team Pursuit (4000M),Silver
1924,Paris,Equestrian,Jumping,"KROLIKIEWICZ, Adam",POL,Men,Individual,Bronze
1928,Amsterdam,Athletics,Athletics,"KONOPACKA, Halina",POL,Women,Discus Throw,Gold
1928,Amsterdam,Equestrian,Eventing,"ANTONIEWICZ, Michal",POL,Men,Team,Bronze
1928,Amsterdam,Equestrian,Eventing,"DE ROMMEL (BARON), Karol",POL,Men,Team,Bronze
1928,Amsterdam,Equestrian,Eventing,"TRENKWALD, Jozef",POL,Men,Team,Bronze
1928,Amsterdam,Equestrian,Jumping,"ANTONIEWICZ, Michal",POL,Men,Team,Silver


In [0]:
# here it will explicity partitioned by Country while writing to disk CMD28 Vs CMD24 || below 2 CMDs cell
olympics_data.repartition(col("Country")).write.partitionBy('Country').mode("overwrite").parquet("s3://atl-mgmt-de-dev/rshukla/OASonD/olympics_data_partitioned")

In [0]:
dbutils.fs.ls("s3://atl-mgmt-de-dev/rshukla/OASonD/olympics_data_partitioned")

In [0]:
dbutils.fs.ls("s3://atl-mgmt-de-dev/rshukla/OASonD/olympics_data_repartitioned")

In [0]:
olympics_data_partitioned = spark.read.parquet("s3://atl-mgmt-de-dev/rshukla/OASonD/olympics_data_partitioned")

In [0]:
partitioned_data_filtered = olympics_data_partitioned.filter('Country = "POL"')
partitioned_data_filtered.explain()

In [0]:
partitioned_data_filtered.display()
## size of files read	11.7 KiB

Year,City,Sport,Discipline,Athlete,Gender,Event,Medal,Country
1924,Paris,Cycling,Cycling Track,"LANGE, Jozef",Men,Team Pursuit (4000M),Silver,POL
1924,Paris,Cycling,Cycling Track,"LAZARSKI, Jan",Men,Team Pursuit (4000M),Silver,POL
1924,Paris,Cycling,Cycling Track,"STANKIEWICZ, Tomasz",Men,Team Pursuit (4000M),Silver,POL
1924,Paris,Cycling,Cycling Track,"SZYMCZYK, Franciszek",Men,Team Pursuit (4000M),Silver,POL
1924,Paris,Equestrian,Jumping,"KROLIKIEWICZ, Adam",Men,Individual,Bronze,POL
1928,Amsterdam,Athletics,Athletics,"KONOPACKA, Halina",Women,Discus Throw,Gold,POL
1928,Amsterdam,Equestrian,Eventing,"ANTONIEWICZ, Michal",Men,Team,Bronze,POL
1928,Amsterdam,Equestrian,Eventing,"DE ROMMEL (BARON), Karol",Men,Team,Bronze,POL
1928,Amsterdam,Equestrian,Eventing,"TRENKWALD, Jozef",Men,Team,Bronze,POL
1928,Amsterdam,Equestrian,Jumping,"ANTONIEWICZ, Michal",Men,Team,Silver,POL



##### Data Skipping and Z-ordering
* Data Skipping :: Use file-level statistics to avoid scanning irrelevant data while performing Spark operations  
* Z-ordering :: A technique to colocate related information in the same set of files. Data skipping can be enhanced by using Z-ordering clustering.  
  -- The databricks Runtime uses these features to dramatically "reduce" the amount of data that needs to be scanned for highly selective queries   
  -- Data skipping & Z-ordering implemented in "Delta Lake", allows Delta Lake to sift through petabytes of data in seconds.


###### Data Skipping
* Dleta tables keep track of simple statistics across table columns  
  -- Minimum and maximum values stored in that columns  
  -- Granularity correlated with I/O granularity : track the granularity of data stored in that column and how that correlates with input/output granularity
* Leverage these statistics at query planning time to avoid unnecessary I/O
* Every lookup query consults these statistics
* Delta uses these statistics to see which files can be safely skipped

###### Z-Ordering 
The performance of Data Skipping can be improved if you structure your data in an efficient way
* Cluster data so related data is colocated : if cluster data so that related data is co-located in the same set of files
* For data lookup, file hits are minimized and data skipping maximized
* Reduces the amount of data read from disk thus improving performance
* Use locality-preserving z-order curves to map data : Z-Order clustering is a mathematical technique, which uses locality preserving Z-Order curves to map data.
* Allows mapping multi-dimensional data to one-dimensional values in way that preserves locality : so that similar data is co-located
  

In [0]:
from pyspark.sql.functions import col

olympics_data = spark.read.format("csv").option("header", True).option("inferSchema", True).load("s3://atl-mgmt-de-dev/rshukla/OASonD/summer.csv")

olympics_data.display()

Year,City,Sport,Discipline,Athlete,Country,Gender,Event,Medal
1896,Athens,Aquatics,Swimming,"HAJOS, Alfred",HUN,Men,100M Freestyle,Gold
1896,Athens,Aquatics,Swimming,"HERSCHMANN, Otto",AUT,Men,100M Freestyle,Silver
1896,Athens,Aquatics,Swimming,"DRIVAS, Dimitrios",GRE,Men,100M Freestyle For Sailors,Bronze
1896,Athens,Aquatics,Swimming,"MALOKINIS, Ioannis",GRE,Men,100M Freestyle For Sailors,Gold
1896,Athens,Aquatics,Swimming,"CHASAPIS, Spiridon",GRE,Men,100M Freestyle For Sailors,Silver
1896,Athens,Aquatics,Swimming,"CHOROPHAS, Efstathios",GRE,Men,1200M Freestyle,Bronze
1896,Athens,Aquatics,Swimming,"HAJOS, Alfred",HUN,Men,1200M Freestyle,Gold
1896,Athens,Aquatics,Swimming,"ANDREOU, Joannis",GRE,Men,1200M Freestyle,Silver
1896,Athens,Aquatics,Swimming,"CHOROPHAS, Efstathios",GRE,Men,400M Freestyle,Bronze
1896,Athens,Aquatics,Swimming,"NEUMANN, Paul",AUT,Men,400M Freestyle,Gold


In [0]:
olympics_data.count()

In [0]:
olympics_data.write.partitionBy("Country").format("delta").mode("overwrite").save("s3://atl-mgmt-de-dev/rshukla/OASonD/olympics_data")

In [0]:
spark.sql("CREATE TABLE zone_marketing_dev.olympics USING DELTA LOCATION 's3://atl-mgmt-de-dev/rshukla/OASonD/olympics_data'")

In [0]:
%sql

SELECT * FROM zone_marketing_dev.olympics

Year,City,Sport,Discipline,Athlete,Country,Gender,Event,Medal
1896,Athens,Athletics,Athletics,"FLACK, Edwin",AUS,Men,1500M,Gold
1896,Athens,Athletics,Athletics,"FLACK, Edwin",AUS,Men,800M,Gold
1900,Paris,Aquatics,Swimming,"LANE, Frederick C.V.",AUS,Men,200M Freestyle,Gold
1900,Paris,Aquatics,Swimming,"LANE, Frederick C.V.",AUS,Men,200M Obstacle Event,Gold
1900,Paris,Athletics,Athletics,"ROWLEY, Stanley",AUS,Men,100M,Bronze
1900,Paris,Athletics,Athletics,"ROWLEY, Stanley",AUS,Men,200M,Bronze
1900,Paris,Athletics,Athletics,"ROWLEY, Stanley",AUS,Men,60M,Bronze
1920,Antwerp,Aquatics,Swimming,"BEAUREPAIRE, Frank E.",AUS,Men,1500M Freestyle,Bronze
1920,Antwerp,Aquatics,Swimming,"BEAUREPAIRE, Frank E.",AUS,Men,4X200M Freestyle Relay,Silver
1920,Antwerp,Aquatics,Swimming,"HAY, Henry",AUS,Men,4X200M Freestyle Relay,Silver


In [0]:
%sql

SELECT COUNT(*) FROM zone_marketing_dev.olympics

count(1)
31165


In [0]:
%sql

SELECT min(Year), max(Year) FROM zone_marketing_dev.olympics

--This means that any filter operation that we perform on the Year column is highly selective query.and our original data is not partitioned based on the year, it's partitioned based on country

min(Year),max(Year)
1896,2012


In [0]:
%sql

--Let's run a filter and grouping operation of medium complexity
SELECT count(*) as NumMedal, Gender, Medal FROM zone_marketing_dev.olympics WHERE year = 2000 GROUP BY Gender, Medal

NumMedal,Gender,Medal
370,Men,Gold
370,Men,Silver
386,Men,Bronze
299,Women,Bronze
297,Women,Silver
293,Women,Gold


In [0]:
%sql

OPTIMIZE zone_marketing_dev.olympics ZORDER BY (Year)

path,metrics
s3://atl-mgmt-de-dev/rshukla/OASonD/olympics_data,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 148, List(minCubeSize(107374182400), List(0, 0), List(148, 874396), 0, List(0, 0), 0, null), 0, 148, 148, false)"


In [0]:
%sql
SELECT count(*) as NumMedal, Gender, Medal FROM zone_marketing_dev.olympics WHERE year = 2000 GROUP BY Gender, Medal

NumMedal,Gender,Medal
370,Men,Gold
370,Men,Silver
386,Men,Bronze
299,Women,Bronze
297,Women,Silver
293,Women,Gold


In [0]:
flight_data = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/databricks-datasets/asa/airlines/2005.csv")
flight_data.display()

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2005,1,28,5,1603.0,1605,1741.0,1759,UA,541,N935UA,158.0,174,131.0,-18.0,-2.0,BOS,ORD,867,4,23,0,,0,0,0,0,0,0
2005,1,29,6,1559.0,1605,1736.0,1759,UA,541,N941UA,157.0,174,136.0,-23.0,-6.0,BOS,ORD,867,6,15,0,,0,0,0,0,0,0
2005,1,30,7,1603.0,1610,1741.0,1805,UA,541,N342UA,158.0,175,131.0,-24.0,-7.0,BOS,ORD,867,9,18,0,,0,0,0,0,0,0
2005,1,31,1,1556.0,1605,1726.0,1759,UA,541,N326UA,150.0,174,129.0,-33.0,-9.0,BOS,ORD,867,11,10,0,,0,0,0,0,0,0
2005,1,2,7,1934.0,1900,2235.0,2232,UA,542,N902UA,121.0,152,106.0,3.0,34.0,ORD,BOS,867,5,10,0,,0,0,0,0,0,0
2005,1,3,1,2042.0,1900,9.0,2232,UA,542,N904UA,147.0,152,97.0,97.0,102.0,ORD,BOS,867,3,47,0,,0,23,0,0,0,74
2005,1,4,2,2046.0,1900,2357.0,2232,UA,542,N942UA,131.0,152,100.0,85.0,106.0,ORD,BOS,867,5,26,0,,0,46,0,0,0,39
2005,1,5,3,,1900,,2232,UA,542,000000,,152,,,,ORD,BOS,867,0,0,1,B,0,0,0,0,0,0
2005,1,6,4,2110.0,1900,8.0,2223,UA,542,N920UA,118.0,143,101.0,105.0,130.0,ORD,BOS,867,2,15,0,,0,16,0,0,0,89
2005,1,7,5,1859.0,1900,2235.0,2223,UA,542,N340UA,156.0,143,96.0,12.0,-1.0,ORD,BOS,867,4,56,0,,0,0,0,0,0,0


In [0]:
flight_data.count()

In [0]:
flight_data.write.partitionBy("Origin").format("delta").mode("overwrite").save("s3://atl-mgmt-de-dev/rshukla/OASonD/flights_data")

spark.sql("CREATE TABLE zone_marketing_dev.flights_data USING DELTA LOCATION 's3://atl-mgmt-de-dev/rshukla/OASonD/flights_data'")

In [0]:
%sql

SELECT min(DayofMonth), max(DayofMonth) FROM zone_marketing_dev.flights_data

min(DayofMonth),max(DayofMonth)
1,31


In [0]:
%sql
--Our Delta table has been partitioned on disk based on origin, but we haven't applied Z-Ordering yet, so data is not co-located in same set of files

SELECT mean(ArrDelay) as Delay, Dest, Month FROM zone_marketing_dev.flights_data WHERE DayofMonth = 31 GROUP BY Month, Dest

Delay,Dest,Month
7.575757575757576,OMA,7
-0.3564356435643564,ONT,5
2.765151515151515,CLE,10
5.363636363636363,JAX,10
12.0,ACY,1
2.25,EKO,7
-6.333333333333333,OME,1
22.75,MLU,10
12.8,LNK,3
-1.4285714285714286,RDM,1


In [0]:
%sql

DESCRIBE FORMATTED zone_marketing_dev.flights_data;

col_name,data_type,comment
Year,int,
Month,int,
DayofMonth,int,
DayOfWeek,int,
DepTime,string,
CRSDepTime,int,
ArrTime,string,
CRSArrTime,int,
UniqueCarrier,string,
FlightNum,int,


In [0]:
%sql

OPTIMIZE zone_marketing_dev.flights_data ZORDER BY (DayofMonth)

path,metrics
s3://atl-mgmt-de-dev/rshukla/OASonD/flights_data,"List(282, 12159, List(9665, 6741669, 378419.21276595746, 282, 106714218), List(6461, 394598, 19503.970885763632, 12159, 237148782), 286, List(minCubeSize(107374182400), List(0, 0), List(12163, 237174805), 0, List(12159, 237148782), 282, null), 1, 12163, 4, false)"


In [0]:
%sql

SELECT mean(ArrDelay) as Delay, Dest, Month FROM zone_marketing_dev.flights_data WHERE DayofMonth = 31 GROUP BY Month, Dest

Delay,Dest,Month
2.765151515151515,CLE,10
5.363636363636363,JAX,10
7.575757575757576,OMA,7
12.8,LNK,3
-0.3564356435643564,ONT,5
-1.4285714285714286,RDM,1
-6.333333333333333,OME,1
2.25,EKO,7
-10.4,TWF,1
12.0,ACY,1



##### Bucketing to Optimize Joins
-Data partitioning technique to pre-shuffle and (optionally) pre-sort data during writes.  
Join operation are often "Wide Transformations", which means they involve shuffles, and when you have a shuffle operation, you are reading data from disk or writing data out to disk.

* Specify columns to be used for bucketing
* Based on column values data allocated to a predefined number of buckets
* Involves sorting and shuffling the data before we perform operations on data

###### Benefits of Bucketing
* Improves performance of join operations
* Spark is able to figure out the right bucket where the join records live
* Avoids shuffles of tables participating in the join
* Specify number of buckets based on the data that we're working with

In [0]:
from pyspark.sql.functions import col, rand

customer_id = spark.range(1, 15000000, 1, 15)
display(customer_id)

id
1
2
3
4
5
6
7
8
9
10


In [0]:
pct_invest_2019 = customer_id.select(col("id").alias("customer_id"), rand(1).alias("pct_investment_2019"))
display(pct_invest_2019)

customer_id,pct_investment_2019
1,0.6363787615254752
2,0.5993846534021868
3,0.134842710012538
4,0.076841639054609
5,0.8539211111755448
6,0.7167704217972344
7,0.2473902407597975
8,0.1367450741851369
9,0.3869569887491171
10,0.6051540605040805


In [0]:
pct_invest_2019.write.format("parquet").option("path", "s3://atl-mgmt-de-dev/rshukla/OASonD/pct_invest_2019_unbucketed").saveAsTable("zone_marketing_dev.pct_invest_2019_unbucketed")

In [0]:
pct_invest_2019.write.format("parquet").bucketBy(15, "customer_id").sortBy("pct_investment_2019")\
               .option("path", "s3://atl-mgmt-de-dev/rshukla/OASonD/pct_invest_2019_bucketed")\
               .saveAsTable("zone_marketing_dev.pct_invest_2019_bucketed")

In [0]:
pct_investment_2020 = customer_id.select(col("id").alias("customer_id"), rand(2).alias("pct_investment_2020"))
display(pct_investment_2020)

customer_id,pct_investment_2020
1,0.5311207224659675
2,0.2861372051669987
3,0.4944306372895662
4,0.4553707744971322
5,0.8792399632068049
6,0.3644632675391507
7,0.4501968242181094
8,0.4199726628902539
9,0.7051587870577706
10,0.0150881458780699


In [0]:
pct_investment_2020.write.format("parquet").bucketBy(15, "customer_id").sortBy("pct_investment_2020")\
               .option("path", "s3://atl-mgmt-de-dev/rshukla/OASonD/pct_invest_2020_bucketed")\
               .saveAsTable("zone_marketing_dev.pct_invest_2020_bucketed")

In [0]:
%sql

use zone_marketing_dev;

show tables like 'pct*'

database,tableName,isTemporary
zone_marketing_dev,pct_invest_2019_bucketed,False
zone_marketing_dev,pct_invest_2019_unbucketed,False
zone_marketing_dev,pct_invest_2020_bucketed,False


In [0]:
pct_2019_unbucketed = spark.table("zone_marketing_dev.pct_invest_2019_unbucketed")
pct_2019_bucketed = spark.table("zone_marketing_dev.pct_invest_2019_bucketed")
pct_2020_bucketed = spark.table("zone_marketing_dev.pct_invest_2020_bucketed")

In [0]:
pct_2019_unbucketed.join(pct_2019_bucketed, "customer_id").explain()

##Coz this is an unbucketed join, both sides need to be repartitioned and data shuffling occurs on both ends.
#Exchange hashpartitioning(customer_id#619860L, 200), ENSURE_REQUIREMENTS, [id=#643214]
#Exchange hashpartitioning(customer_id#619864L, 200), ENSURE_REQUIREMENTS, [id=#643221]

In [0]:
spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.get("spark.sql.adaptive.enabled")

display(pct_2019_unbucketed.join(pct_2019_bucketed, "customer_id"))

# check above executed query in SQL section of Monitoring UI and in bottom "CustomShuffle" operation on the both sides, both side shuffled & repartitioned 

customer_id,pct_investment_2019,pct_investment_2019.1
26,0.5486344061553862,0.5486344061553862
29,0.1095486574092523,0.1095486574092523
474,0.2165494135955735,0.2165494135955735
964,0.896007891594797,0.896007891594797
1677,0.5832904558746179,0.5832904558746179
1697,0.8628401150610894,0.8628401150610894
1806,0.8994038523880244,0.8994038523880244
1950,0.6653449812596528,0.6653449812596528
2040,0.1781838909062038,0.1781838909062038
2214,0.6911710404508692,0.6911710404508692


In [0]:
pct_2019_unbucketed.repartition(15, "customer_id").join(pct_2019_bucketed, "customer_id").explain()
#Repartition and shuffling only happens on the unbucketed table
#Exchange hashpartitioning(customer_id#619860L, 15), REPARTITION_WITH_NUM, [id=#643584], so we have reduced the number of repartitioning to be just one

In [0]:
display(pct_2019_unbucketed.repartition(15, "customer_id").join(pct_2019_bucketed, "customer_id"))

##In spark UI exchange operation in SQL Section only happens on left table if compare with previous one

customer_id,pct_investment_2019,pct_investment_2019.1
6,0.7167704217972344,0.7167704217972344
16,0.9643107647469809,0.9643107647469809
63,0.2970280250906274,0.2970280250906274
64,0.3992368306476186,0.3992368306476186
70,0.2519628438595569,0.2519628438595569
80,0.1001956565216214,0.1001956565216214
123,0.0644410787913165,0.0644410787913165
130,0.942723722436596,0.942723722436596
148,0.7218504473666008,0.7218504473666008
163,0.5427676535722059,0.5427676535722059


In [0]:
pct_2019_unbucketed.repartition("customer_id").join(pct_2019_bucketed, "customer_id").explain()

#repatitioned to default partitioned 200, which did not match with LEFT table bucket (which was 200), but in right it was 15 so it repartitioned to 200, that's why 2 
#Exchange hashpartitioning(customer_id#619860L, 200), REPARTITION, [id=#647423]
#Exchange hashpartitioning(customer_id#619864L, 200), ENSURE_REQUIREMENTS, [id=#647430]  --see here ensure requirments

In [0]:
pct_2019_bucketed.join(pct_2020_bucketed, "customer_id").explain()

#no repartition and shuffle/.exchange operation

In [0]:
display(pct_2019_bucketed.join(pct_2020_bucketed, "customer_id"))

#no exchange here SQL of Spark UI

customer_id,pct_investment_2019,pct_investment_2020
6,0.7167704217972344,0.3644632675391507
16,0.9643107647469809,0.091261354181826
63,0.2970280250906274,0.9409547568198008
64,0.3992368306476186,0.2326988506357845
70,0.2519628438595569,0.3222232236580341
80,0.1001956565216214,0.990574198197933
123,0.0644410787913165,0.1884219435268566
130,0.942723722436596,0.0575830742136311
148,0.7218504473666008,0.8206723867477947
163,0.5427676535722059,0.6064469726646279



### Optimizing Spark for Performance
* FIFO and Fair Scheduling
* Caching Frequently used data
* Improvements in Apache Spark 3.0

##### FIFO Scheduler
###### FIFO Scheduling ::   
- Let's say **JOB A** comes in, __JOB A__ will take up all of the resources on the cluster. If *Job B* comes in *Job B* has to wait for **Job A** to complete before it gets to use the resources of the cluster.   
- This can be problematic if have a long running JOB. 
- If *Job B* was lot smaller, still it takes long time to complete coz of the wait time
- FIFO scheduler can result in very long wait times

###### Fair Scheduler
- Resources are always proportionally allocated to all jobs
- Zero wait time for any job
- Can also specify job priorities
- Priorities used as weights to allocate cluster resources
- Organizes jobs into scheduling pools :: pools are basically avail resources for jobs
- Divides resources fairly between pools 
- Can have Separate pool for each user | can have separate pool for jobs at diff priority levels
- Allocates minimum shares to pools :: so that jobs in a particular pool are never starved of resources
- Example - JOB A comes in get full avail resources  
          - Job B comes in get half avail resources let's say  
          - Job C comes in get's 3rd share

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

schema = """total_bill float, tip float, sex string, smoker string, day string, time string, size string"""

tips_data_source_stream = spark.readStream.format("csv").option("header", True).schema(schema).load("s3://atl-mgmt-de-dev/rshukla/OASonD/tips_data_source_stream/")

In [0]:
print("Is this streaming data? ",tips_data_source_stream.isStreaming)

In [0]:
tips_data_source_stream.display()

total_bill,tip,sex,smoker,day,time,size
16.0,2.0,Male,Yes,Thur,Lunch,2
21.01,3.0,Male,Yes,Fri,Dinner,2
30.14,3.09,Female,Yes,Sat,Dinner,4
17.81,2.34,Male,No,Sat,Dinner,4
14.07,2.5,Male,No,Sun,Dinner,2
7.74,1.44,Male,Yes,Sat,Dinner,2
13.94,3.06,Male,No,Sun,Dinner,2
32.83,1.17,Male,Yes,Sat,Dinner,2
25.89,5.16,Male,Yes,Sat,Dinner,4
48.33,9.0,Male,No,Sat,Dinner,4


In [0]:
tips_data_source_stream.groupBy('day').agg({'tip': 'avg'}).display()

# pOOL NAME = 9105030381464080234	

day,avg(tip)
Fri,2.734736856661345
Sat,2.9783333367771574
Sun,3.258441553487406
Thur,2.7714516078272173


In [0]:
tips_data_source_stream.groupBy('time').agg({'tip': 'avg'}).display()

#Both job executed parallely coz mode is FAIR, POOL = 	9105030381464080234

time,avg(tip)
Lunch,2.728088231647716
Dinner,3.09572222299046


In [0]:
##### fairscheduler.xml

<?xml version="1.0">
<allocation>
  <pool name="devPool">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="prodPool">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
</allocation>

In [0]:
#spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", "dbfs:/FileStore/fairscheduler.xml")
spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", "s3://atl-mgmt-de-dev/rshukla/OASonD/fairscheduler.xml")

In [0]:
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "devPool")

tips_data_source_stream.groupBy('day').agg({'tip': 'avg'}).display()

## pool = devPool

day,avg(tip)
Sat,2.9783333367771574
Sun,3.258441553487406
Thur,2.7714516078272173
Fri,2.734736856661345


In [0]:
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "prodPool")

tips_data_source_stream.groupBy('time').agg({'tip': 'avg'}).display()

## pool = prodPool

time,avg(tip)
Dinner,3.09572222299046
Lunch,2.728088231647716


###### Caching
It's inefficient to read and scan that data over and over again. Can get tremendous performance improvements using caching
* There are 2 Caching techniques we can use Databricks notebook  
  -- Delta Cache :: Can work with Delta tables & also work with data stored in parquet files  
  -- Apache Spark Cache :: can use the caching option avail in Apache Spark itself


- Fast reads by creating copies of emote files on node's local storage
- Data stored in a fast, intermediate format - which is extremly fast to read and work with
- Data is cached automatically whenever a file is fetched from remote location
- Successive reads of the same data will read local files, it will not reach out to remote file repository. Thus speeding up our queries
- Automatically detects when data files are created or deleted : The delta cache can be used without any cache management overhead.
- Write, Modify, and delete tables without explicitly updating the cache
- Delta caching is always enabled in a Delta Cached Accelerated worker type (when working on Apache Spark on Databricks)
- Can explicitly enable the cache on other worker types

###### Delta Cache
* Stored as local files on a worker node
* Applied to any Parquet table
* Triggered on first read if caching is enabled
* Force caching using **CACHE** and **SELECT**
* Cache evicted automatically on file change, manually on cluster restart
* Can be enabled or disabled using configuration flags

###### Apache Spark Cache
* Stored as in-memory blocks
* Applied to any Dataframe or RDD
* Triggered manually, requires code changes
* Force cache using **.cache + materialization action** or **.persist**
* Cache evicted automatically in LRU (Least Recent used) fashion
* Always available

###### # We can use Delta caching & Apache Spark caching at the same time

###### Properties to set 
spark.databricks.io.cache.maxDiskUsage 50g   --> Specifies the disk space per node result for cached data, this is in bytes  
spark.databricks.io.cache.maxMetaDataCache 1g  --> Specifies the disk space per node result for cached metadata, in bytes  
spark.databricks.io.cache.compression.enabled false  

* Restart Cluster and Under "Storage" in Spark UI , there would be entry as - "Parquet IO Cache"

In [0]:
from pyspark.sql.functions import col

flight_data = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/databricks-datasets/asa/airlines/2005.csv")

In [0]:
flight_data.write.parquet("s3://atl-mgmt-de-dev/rshukla/OASonD/parquet_flight_data")

In [0]:
spark.sql("CREATE TABLE zone_marketing_dev.flight_data USING PARQUET LOCATION 's3://atl-mgmt-de-dev/rshukla/OASonD/parquet_flight_data'")

In [0]:
%sql
SELECT * FROM zone_marketing_dev.flight_data

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2005,2,26,6,1331.0,1330,1625.0,1615,B6,53,N537JB,174.0,165,156.0,10.0,1.0,JFK,TPA,1005,2,16,0,,0,0,0,0,0,0
2005,2,26,6,627.0,630,916.0,915,B6,63,N537JB,169.0,165,150.0,1.0,-3.0,JFK,TPA,1005,2,17,0,,0,0,0,0,0,0
2005,2,26,6,1032.0,1035,1340.0,1325,B6,65,N565JB,188.0,170,158.0,15.0,-3.0,JFK,TPA,1005,2,28,0,,0,0,0,15,0,0
2005,2,26,6,1051.0,1055,1831.0,1840,B6,192,N552JB,280.0,285,439.0,-9.0,-4.0,LAS,JFK,2248,4,17,0,,0,0,0,0,0,0
2005,2,26,6,1553.0,1555,2331.0,2345,B6,194,N528JB,278.0,290,446.0,-14.0,-2.0,LAS,JFK,2248,3,9,0,,0,0,0,0,0,0
2005,2,26,6,2325.0,2330,700.0,720,B6,196,N552JB,275.0,290,437.0,-20.0,-5.0,LAS,JFK,2248,3,15,0,,0,0,0,0,0,0
2005,2,26,6,1256.0,1250,2038.0,2040,B6,198,N553JB,282.0,290,442.0,-2.0,6.0,LAS,JFK,2248,3,17,0,,0,0,0,0,0,0
2005,2,26,6,1349.0,1355,1449.0,1500,B6,281,N523JB,60.0,65,43.0,-11.0,-6.0,LAS,LGB,231,4,13,0,,0,0,0,0,0,0
2005,2,26,6,1734.0,1740,1836.0,1845,B6,283,N583JB,62.0,65,41.0,-9.0,-6.0,LAS,LGB,231,5,16,0,,0,0,0,0,0,0
2005,2,26,6,609.0,600,855.0,910,B6,371,N547JB,166.0,190,145.0,-15.0,9.0,LGA,FLL,1076,6,15,0,,0,0,0,0,0,0


In [0]:
%python
spark.conf.set("spark.databricks.io.cache.enabled", False)

In [0]:
%sql

SELECT * FROM zone_marketing_dev.flight_data WHERE origin = "PHL"

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2005,2,6,7,1750.0,1800,1919.0,1945,CO,1600,N47332,89.0,105,56.0,-26.0,-10.0,PHL,CLE,363,9,24,0,,0,0,0,0,0,0
2005,2,1,2,550.0,550,852.0,830,CO,1577,N16339,242.0,220,217.0,22.0,0.0,PHL,IAH,1324,11,14,0,,0,0,0,22,0,0
2005,2,22,2,1523.0,1535,1805.0,1818,CO,1498,N24633,222.0,223,203.0,-13.0,-12.0,PHL,IAH,1324,10,9,0,,0,0,0,0,0,0
2005,2,23,3,1226.0,1225,1508.0,1459,CO,1877,N17229,222.0,214,200.0,9.0,1.0,PHL,IAH,1324,11,11,0,,0,0,0,0,0,0
2005,2,16,3,752.0,800,1042.0,1047,CO,1777,N36247,230.0,227,192.0,-5.0,-8.0,PHL,IAH,1324,12,26,0,,0,0,0,0,0,0
2005,2,21,1,558.0,600,926.0,832,CO,1577,N12225,268.0,212,204.0,54.0,-2.0,PHL,IAH,1324,6,58,0,,0,0,0,54,0,0
2005,2,17,4,2125.0,1732,47.0,1958,CO,1077,N24224,262.0,206,192.0,289.0,233.0,PHL,IAH,1324,5,65,0,,0,0,233,56,0,0
2005,2,12,6,1432.0,1440,1701.0,1710,CO,1210,N24715,209.0,210,186.0,-9.0,-8.0,PHL,IAH,1324,10,13,0,,0,0,0,0,0,0
2005,2,18,5,1535.0,1535,1811.0,1818,CO,1498,N14346,216.0,223,189.0,-7.0,0.0,PHL,IAH,1324,15,12,0,,0,0,0,0,0,0
2005,2,11,5,1037.0,1045,1322.0,1335,CO,1877,N47332,225.0,230,193.0,-13.0,-8.0,PHL,IAH,1324,9,23,0,,0,0,0,0,0,0


In [0]:
spark.conf.set("spark.databricks.io.cache.enabled", True)

In [0]:
%sql

CACHE SELECT * FROM zone_marketing_dev.flight_data 

In [0]:
%sql

SELECT * FROM zone_marketing_dev.flight_data WHERE origin = "PHL"

-----------Spark UI - SQL - CLICK ON Query - Expand  
--cache hits size total (min, med, max)	1420.6 KiB (0.0 B, 641.4 KiB, 779.2 KiB)
--cache hits size (uncompressed) total (min, med, max)	1684.2 KiB (0.0 B, 764.3 KiB, 919.9 KiB)
--cache writes size total (min, med, max)	594.1 KiB (0.0 B, 0.0 B, 594.1 KiB)
----------

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2005,6,1,3,1602.0,1440,1826.0,1705,DL,301,N950DL,144.0,145,108.0,81.0,82.0,PHL,ATL,665,8,28,0,,0,0,0,81,0,0
2005,6,1,3,634.0,640,842.0,900,DL,383,N905DE,128.0,140,104.0,-18.0,-6.0,PHL,ATL,665,8,16,0,,0,0,0,0,0,0
2005,6,1,3,748.0,740,1014.0,956,DL,425,N957DL,146.0,136,108.0,18.0,8.0,PHL,ATL,665,11,27,0,,0,0,0,18,0,0
2005,6,1,3,1622.0,1540,1844.0,1805,DL,439,N977DL,142.0,145,107.0,39.0,42.0,PHL,ATL,665,8,27,0,,0,0,0,39,0,0
2005,6,1,3,1154.0,1140,1415.0,1352,DL,529,N301DL,141.0,132,111.0,23.0,14.0,PHL,ATL,665,9,21,0,,0,14,0,9,0,0
2005,6,1,3,941.0,940,1151.0,1145,DL,551,N904DL,130.0,125,109.0,6.0,1.0,PHL,ATL,665,9,12,0,,0,0,0,0,0,0
2005,6,1,3,1750.0,1750,1934.0,2000,DL,658,N318DL,104.0,130,78.0,-26.0,0.0,PHL,CVG,507,12,14,0,,0,0,0,0,0,0
2005,6,1,3,1653.0,1657,1957.0,1955,DL,685,N3765,304.0,298,276.0,2.0,-4.0,PHL,SLC,1926,4,24,0,,0,0,0,0,0,0
2005,6,1,3,1830.0,1640,2102.0,1909,DL,743,N986DL,152.0,149,107.0,113.0,110.0,PHL,ATL,665,10,35,0,,0,110,0,3,0,0
2005,6,1,3,717.0,725,1018.0,1006,DL,760,N372DA,301.0,281,275.0,12.0,-8.0,PHL,SLC,1926,7,19,0,,0,0,0,0,0,0



#### New Features and Updates in Apache Spark 3.0

###### Adaptive Query Execution Framework for better performance 
This framework improve performance and simplifies tuning by generating a better execution plan at runtime,even if the initial plan is suboptimal coz all of data statistics are not present. Spark 3.0 has three major adaptive optimizations  
* **Dynamic coalescing of shuffle partitions** --> simplifies or even avoids tuning the number of shuffle partitions when runing queries, can start of with a large number of shuffle partitions and adaptive query estimation can combine adjacent small paritions into larger ones at runtime and do this automatically.
* **Dynamic switching of join strategies** --> helps partially avoid executing suboptimal plans, which may have been generated due to missing statistics or some kind of size misestimation. The adaptive optimization can automatically convert "sort merge" joins to "broadcast hash joins" at runtime making joins faster.
* **Dynamic optimization of skew joins** --> Skew introduced during transformations can lead to poor performance. Spark3 can dynamically optimize skew joins, which is another performance improvements. Skew joins can lead to an extreme imbalance of work and severely downgrade performance. Adaptive query Execution can detect skew and run optimizations specifically for mitigating the skew.

###### Dynamic parition pruning to determine partitions to skip
If the optimizer is unable to identify at compile time, the partitions that can be skipped, the adaptive optimization can figure out which partitions can be pruned and skipped at runtime.
###### ANSI SQL Compliant
###### JOIN Hints
###### Pandas API improvements and type hints
###### Better error handling