# Technique 1: reduce data shuffle

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.appName("optimize-spark").master("spark://host.docker.internal:7077").\
        config("spark.executor.memory", "3G").\
        config("spark.eventLog.enabled", "true").\
        config("spark.eventLog.dir", "hdfs://namenode/shared/spark-logs").\
        config("spark.history.fs.logDirectory", "hdfs://namenode/shared/spark-logs").\
        getOrCreate()

In [4]:
parkViolations = spark.read.option("header", True).csv("hdfs://namenode/input/")

In [5]:
plateTypeCountDF = parkViolations.groupBy("Plate Type").count()

In [6]:
plateTypeCountDF.explain() # used to show the plan before execution, in the UI we can only see executed commands

== Physical Plan ==
*(2) HashAggregate(keys=[Plate Type#19], functions=[count(1)])
+- Exchange hashpartitioning(Plate Type#19, 200), ENSURE_REQUIREMENTS, [id=#34]
   +- *(1) HashAggregate(keys=[Plate Type#19], functions=[partial_count(1)])
      +- FileScan csv [Plate Type#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[hdfs://namenode/input], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Plate Type:string>




In [7]:
plateTypeCountDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("hdfs://namenode/output/plate_type_count")

In [8]:
plateTypeCountDF.show()

+----------+-------+
|Plate Type|  count|
+----------+-------+
|       CCK|     47|
|       OMO|     63|
|       LMB|     89|
|       CLG|     16|
|       SOS|    246|
|       SPC|    688|
|       SUP|     71|
|       NYA|    198|
|       APP|  40347|
|       FAR|     49|
|       RGL|  11173|
|       CHC|    792|
|       STA|   1605|
|       BOT|     34|
|       RGC|    401|
|       TRC|  57575|
|       AMB|    196|
|       COM|4350603|
|       BOB|    150|
|       HAM|    328|
+----------+-------+
only showing top 20 rows



In [9]:
#Solve
parkViolations = spark.read.option("header", True).csv("hdfs://namenode/input/")
parkViolationsPlateTypeDF = parkViolations.repartition(87, "Plate Type")
parkViolationsPlateTypeDF.explain() # you will see a filescan to read data and exchange hashpartition to shuffle and partition based on Plate Type

== Physical Plan ==
Exchange hashpartitioning(Plate Type#208, 87), REPARTITION_WITH_NUM, [id=#112]
+- FileScan csv [Summons Number#205,Plate ID#206,Registration State#207,Plate Type#208,Issue Date#209,Violation Code#210,Vehicle Body Type#211,Vehicle Make#212,Issuing Agency#213,Street Code1#214,Street Code2#215,Street Code3#216,Vehicle Expiration Date#217,Violation Location#218,Violation Precinct#219,Issuer Precinct#220,Issuer Code#221,Issuer Command#222,Issuer Squad#223,Violation Time#224,Time First Observed#225,Violation County#226,Violation In Front Of Or Opposite#227,House Number#228,... 27 more fields] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[hdfs://namenode/input], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Summons Number:string,Plate ID:string,Registration State:string,Plate Type:string,Issue Da...




In [10]:
plateTypeCountDF = parkViolationsPlateTypeDF.groupBy("Plate Type").count()
plateTypeCountDF.explain() # check the execution plan, you will see the bottom 2 steps are for creating parkViolationsPlateTypeDF

== Physical Plan ==
*(1) HashAggregate(keys=[Plate Type#208], functions=[count(1)])
+- *(1) HashAggregate(keys=[Plate Type#208], functions=[partial_count(1)])
   +- Exchange hashpartitioning(Plate Type#208, 87), REPARTITION_WITH_NUM, [id=#124]
      +- FileScan csv [Plate Type#208] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[hdfs://namenode/input], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Plate Type:string>




In [11]:
plateTypeCountDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("/output/plate_type_count.csv")

# Technique 2. Use caching, when necessary

In [12]:
parkViolations = spark.read.option("header", True).csv("hdfs://namenode/input/")
parkViolationsPlateTypeDF = parkViolations.repartition(87, "Plate Type")
plateTypeCountDF = parkViolationsPlateTypeDF.groupBy("Plate Type").count()
plateTypeCountDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("/output/plate_type_count.csv")
# we also do a average aggregation
plateTypeAvgDF = parkViolationsPlateTypeDF.groupBy("Plate Type").avg() # avg is not meaningful here, but used just as an aggregation example
plateTypeAvgDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("/output/plate_type_avg.csv")

In [None]:
# we are redoing the repartition step each time for plateTypeCountDF and plateTypeAvgDF dataframe
# We can prevent the second repartition by caching the result of the first repartition, as shown below (clusters cache memory)

In [13]:
parkViolations = spark.read.option("header", True).csv("hdfs://namenode/input/")
parkViolationsPlateTypeDF = parkViolations.repartition(87, "Plate Type")
cachedDF = parkViolationsPlateTypeDF.select('Plate Type').cache() # we are caching only the required field of the dataframe in memory to keep cache size small
plateTypeCountDF = cachedDF.groupBy("Plate Type").count()
plateTypeCountDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("/output/plate_type_count.csv")
# we also do a average aggregation
plateTypeAvgDF = cachedDF.groupBy("Plate Type").avg() # avg is not meaningful here, but used just as an aggregation example
plateTypeAvgDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("/output/plate_type_avg.csv")

In [None]:
# For very large dataframes we can use persist method to save the dataframe using a combination of cache and disk if necessary

# Technique 3. Join strategies - broadcast join and bucketed joins

## 3.1. Broadcast Join

In [None]:
#  we are broadcasting the dimension table.
#  to move the mapping table to each of the node that has the fact tables data in it and preventing the data shuffle of the large dataset
# By default the maximum size for a table to be considered for broadcasting is 10MB
# This is set using the spark.sql.autoBroadcastJoinThreshold variable

In [None]:
# First lets consider a join without broadcast.

In [2]:
parkViolations_2015 = spark.read.option("header", True).csv("hdfs://namenode/input/2015.csv")
parkViolations_2016 = spark.read.option("header", True).csv("hdfs://namenode/input/2016.csv")

parkViolations_2015 = parkViolations_2015.withColumnRenamed("Plate Type", "plateType") # simple column rename for easier joins
parkViolations_2016 = parkViolations_2016.withColumnRenamed("Plate Type", "plateType")

parkViolations_2016_COM = parkViolations_2016.filter(parkViolations_2016.plateType == "COM")
parkViolations_2015_COM = parkViolations_2015.filter(parkViolations_2015.plateType == "COM")

joinDF = parkViolations_2015_COM.join(parkViolations_2016_COM, parkViolations_2015_COM.plateType ==  parkViolations_2016_COM.plateType, "inner").select(parkViolations_2015_COM["Summons Number"], parkViolations_2016_COM["Issue Date"])
joinDF.explain() # you will see SortMergeJoin, with exchange for both dataframes, which means involves data shuffle of both dataframe


== Physical Plan ==
*(5) Project [Summons Number#16, Issue Date#138]
+- *(5) SortMergeJoin [plateType#236], [plateType#288], Inner
   :- *(2) Sort [plateType#236 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(plateType#236, 200), ENSURE_REQUIREMENTS, [id=#72]
   :     +- *(1) Project [Summons Number#16, Plate Type#19 AS plateType#236]
   :        +- *(1) Filter (isnotnull(Plate Type#19) AND (Plate Type#19 = COM))
   :           +- FileScan csv [Summons Number#16,Plate Type#19] Batched: false, DataFilters: [isnotnull(Plate Type#19), (Plate Type#19 = COM)], Format: CSV, Location: InMemoryFileIndex[hdfs://namenode/input/2015.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Plate Type), EqualTo(Plate Type,COM)], ReadSchema: struct<Summons Number:string,Plate Type:string>
   +- *(4) Sort [plateType#288 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(plateType#288, 200), ENSURE_REQUIREMENTS, [id=#81]
         +- *(3) Project [Plate Type#137 AS plateType#288,

In [3]:
# The below join will take a very long time with the given infrastructure, do not run, unless needed
joinDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("hdfs://namenode/output/joined_df_without_broadcast")

In [None]:
# In order to prevent the data shuffle of 2 large datasets, you can optimize your code to enable broadcast join, as shown below

In [None]:
parkViolations_2015 = spark.read.option("header", True).csv("hdfs://namenode/input/2015.csv")
parkViolations_2016 = spark.read.option("header", True).csv("hdfs://namenode/input/2016.csv")

parkViolations_2015 = parkViolations_2015.withColumnRenamed("Plate Type", "plateType") # simple column rename for easier joins
parkViolations_2016 = parkViolations_2016.withColumnRenamed("Plate Type", "plateType")

parkViolations_2015_COM = parkViolations_2015.filter(parkViolations_2015.plateType == "COM").select("plateType", "Summons Number").distinct()
parkViolations_2016_COM = parkViolations_2016.filter(parkViolations_2016.plateType == "COM").select("plateType", "Issue Date").distinct()

parkViolations_2015_COM.cache()
parkViolations_2016_COM.cache()

parkViolations_2015_COM.count() # will cause parkViolations_2015_COM to be cached
parkViolations_2016_COM.count() # will cause parkViolations_2016_COM to be cached

joinDF = parkViolations_2015_COM.join(parkViolations_2016_COM.hint("broadcast"), parkViolations_2015_COM.plateType ==  parkViolations_2016_COM.plateType, "inner").select(parkViolations_2015_COM["Summons Number"], parkViolations_2016_COM["Issue Date"])
joinDF.explain() # you will see BroadcastHashJoin

== Physical Plan ==
*(2) Project [Summons Number#1338, Issue Date#1460]
+- *(2) BroadcastHashJoin [plateType#1558], [plateType#1610], Inner, BuildRight, false
   :- *(2) Filter isnotnull(plateType#1558)
   :  +- InMemoryTableScan [plateType#1558, Summons Number#1338], [isnotnull(plateType#1558)]
   :        +- InMemoryRelation [plateType#1558, Summons Number#1338], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- *(2) HashAggregate(keys=[plateType#1558, Summons Number#1338], functions=[])
   :                 +- Exchange hashpartitioning(plateType#1558, Summons Number#1338, 200), ENSURE_REQUIREMENTS, [id=#431]
   :                    +- *(1) HashAggregate(keys=[plateType#1558, Summons Number#1338], functions=[])
   :                       +- *(1) Project [Plate Type#1341 AS plateType#1558, Summons Number#1338]
   :                          +- *(1) Filter (isnotnull(Plate Type#1341) AND (Plate Type#1341 = COM))
   :                             +- FileScan csv [S

In [None]:
joinDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("hdfs://namenode/output/joined_df_with_broadcast")

## 3.2. Bucketed Join

In [None]:
# A basic approach would be to repartition one dataframe by the field on which the join is to be performed and then join with the second dataframe, 
# this would involve data shuffle for the second dataframe at transformation time.

In [None]:
# Another approach would be to use bucketed joins. Bucketing is a technique which you can use to repartition a dataframe based on a field. 
# If you bucket both the dataframe based on the filed that they are supposed to be joined on, 
# it will result in both the dataframes having their data chunks to be made available in the same nodes for joins, 
# because the location of nodes are chosen using the hash of the partition field.

In [18]:
parkViolations_2015 = spark.read.option("header", True).csv("hdfs://namenode/input/2015.csv")
parkViolations_2016 = spark.read.option("header", True).csv("hdfs://namenode/input/2016.csv")

new_column_name_list= list(map(lambda x: x.replace(" ", "_"), parkViolations_2015.columns))

parkViolations_2015 = parkViolations_2015.toDF(*new_column_name_list)
parkViolations_2015 = parkViolations_2015.filter(parkViolations_2015.Plate_Type == "COM").filter(parkViolations_2015.Vehicle_Year == "2001")
parkViolations_2016 = parkViolations_2016.toDF(*new_column_name_list)
parkViolations_2016 = parkViolations_2016.filter(parkViolations_2016.Plate_Type == "COM").filter(parkViolations_2016.Vehicle_Year == "2001")
# we filter for COM and 2001 to limit time taken for the join

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) # we do this so that Spark does not auto optimize for broadcast join, setting to -1 means disable

parkViolations_2015.write.mode("overwrite").bucketBy(400, "Vehicle_Year", "plate_type").saveAsTable("parkViolations_bkt_2015")
parkViolations_2016.write.mode("overwrite").bucketBy(400, "Vehicle_Year", "plate_type").saveAsTable("parkViolations_bkt_2016")

In [20]:
parkViolations_2015_tbl = spark.read.table("parkViolations_bkt_2015")
parkViolations_2016_tbl = spark.read.table("parkViolations_bkt_2016")

In [21]:
joinDF = parkViolations_2015_tbl.join(parkViolations_2016_tbl, (parkViolations_2015_tbl.Plate_Type ==  parkViolations_2016_tbl.Plate_Type) & (parkViolations_2015_tbl.Vehicle_Year ==  parkViolations_2016_tbl.Vehicle_Year) , "inner").select(parkViolations_2015_tbl["Summons_Number"], parkViolations_2016_tbl["Issue_Date"])

joinDF.explain() # you will see SortMergeJoin, but no exchange, which means no data shuffle

== Physical Plan ==
*(3) Project [Summons_Number#2508, Issue_Date#2614]
+- *(3) SortMergeJoin [Vehicle_Year#2543, Plate_Type#2511], [Vehicle_Year#2645, Plate_Type#2613], Inner
   :- *(1) Sort [Vehicle_Year#2543 ASC NULLS FIRST, Plate_Type#2511 ASC NULLS FIRST], false, 0
   :  +- *(1) Filter (isnotnull(Plate_Type#2511) AND isnotnull(Vehicle_Year#2543))
   :     +- *(1) ColumnarToRow
   :        +- FileScan parquet default.parkviolations_bkt_2015[Summons_Number#2508,Plate_Type#2511,Vehicle_Year#2543] Batched: true, DataFilters: [isnotnull(Plate_Type#2511), isnotnull(Vehicle_Year#2543)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/workspace/test/spark-warehouse/parkviolations_bkt_2015], PartitionFilters: [], PushedFilters: [IsNotNull(Plate_Type), IsNotNull(Vehicle_Year)], ReadSchema: struct<Summons_Number:string,Plate_Type:string,Vehicle_Year:string>, SelectedBucketsCount: 400 out of 400
   +- *(2) Sort [Vehicle_Year#2645 ASC NULLS FIRST, Plate_Type#2613 ASC NULLS FIRST], false

In [None]:
# The below join will take a while, approx 30min
joinDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("/output/bkt_joined_df.csv")