In [1]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

In [2]:
def get_spark_context(app_name: str) -> SparkSession:
    """
    Helper to manage the `SparkContext` and keep all of our
    configuration params in one place. See below comments for details:
        |_ https://github.com/bitnami/bitnami-docker-spark/issues/18#issuecomment-700628676
        |_ https://github.com/leriel/pyspark-easy-start/blob/master/read_file.py
    """

    conf = SparkConf()

    conf.setAll(
        [
            ("spark.master", "spark://spark-master:7077"),
            # ("spark.driver.host", "172.23.0.2"),
            # ("spark.submit.deployMode", "client"),
            # ("spark.driver.bindAddress", "0.0.0.0"),
            ("spark.app.name", app_name),
            ("spark.dynamicAllocation.enabled", "false"),
            ("spark.executor.memory", "4G")
        ]
    )

    return SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
spark = get_spark_context("Playgroud")

22/11/26 07:48:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Technique 1: reduce data shuffle

In [4]:
parkViolations = spark.read.option("header", True).csv("hdfs://hadoop-namenode:9000/input/")

                                                                                

In [6]:
plateTypeCountDF = parkViolations.groupBy("Plate Type").count()

In [7]:
plateTypeCountDF.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[Plate Type#19], functions=[count(1)])
+- Exchange hashpartitioning(Plate Type#19, 200), true, [id=#34]
   +- *(1) HashAggregate(keys=[Plate Type#19], functions=[partial_count(1)])
      +- FileScan csv [Plate Type#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[hdfs://hadoop-namenode:9000/input], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Plate Type:string>




In [9]:
plateTypeCountDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("hdfs://hadoop-namenode:9000/output/plate_type_count")

                                                                                

Repartition

In [11]:
parkViolations = spark.read.option("header", True).csv("hdfs://hadoop-namenode:9000/input/")
parkViolationsPlateTypeDF = parkViolations.repartition(87, "Plate Type")

In [13]:
parkViolationsPlateTypeDF.explain()

== Physical Plan ==
Exchange hashpartitioning(Plate Type#200, 87), false, [id=#113]
+- FileScan csv [Summons Number#197,Plate ID#198,Registration State#199,Plate Type#200,Issue Date#201,Violation Code#202,Vehicle Body Type#203,Vehicle Make#204,Issuing Agency#205,Street Code1#206,Street Code2#207,Street Code3#208,Vehicle Expiration Date#209,Violation Location#210,Violation Precinct#211,Issuer Precinct#212,Issuer Code#213,Issuer Command#214,Issuer Squad#215,Violation Time#216,Time First Observed#217,Violation County#218,Violation In Front Of Or Opposite#219,House Number#220,... 27 more fields] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[hdfs://hadoop-namenode:9000/input], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Summons Number:string,Plate ID:string,Registration State:string,Plate Type:string,Issue Da...




In [14]:
plateTypeCountDF = parkViolationsPlateTypeDF.groupBy("Plate Type").count()
plateTypeCountDF.explain() # check the execution plan, you will see the bottom 2 steps are for creating parkViolationsPlateTypeDF
plateTypeCountDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("/output/plate_type_count.csv")

== Physical Plan ==
*(1) HashAggregate(keys=[Plate Type#200], functions=[count(1)])
+- *(1) HashAggregate(keys=[Plate Type#200], functions=[partial_count(1)])
   +- Exchange hashpartitioning(Plate Type#200, 87), false, [id=#125]
      +- FileScan csv [Plate Type#200] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[hdfs://hadoop-namenode:9000/input], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Plate Type:string>




                                                                                

Technique 2. Use caching, when necessary


In [5]:
parkViolationsPlateTypeDF = parkViolations.repartition(87, "Plate Type")

In [6]:

cachedDF = parkViolationsPlateTypeDF.select('Plate Type').cache() # we are caching only the required field of the  dataframe in memory to keep cache size small

In [7]:
plateTypeCountDF = cachedDF.groupBy("Plate Type").count()

In [8]:
plateTypeCountDF.show()

22/11/26 07:50:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+----------+------+
|Plate Type| count|
+----------+------+
|       AGR|   731|
|       SOS|   246|
|       WUG|     8|
|       AYG|   333|
|       SRN|  7327|
|       VPL|    75|
|       HOU|     1|
|       TRL|  5696|
|       JWV|     2|
|       SNO|     2|
|       MCD|   140|
|       AMB|   196|
|       MOT| 55722|
|       OMV|   216|
|       FPW|    60|
|       LMC|    40|
|       ATV|    25|
|       HSM|    13|
|       IRP|119641|
|       PHS|  1952|
+----------+------+
only showing top 20 rows



                                                                                

In [17]:
plateTypeCountDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("hdfs://hadoop-namenode:9000/output/plate_type_count.csv")

                                                                                

In [11]:
plateTypeAvgDF = cachedDF.groupBy("Plate Type").avg()
plateTypeAvgDF.show()

+----------+
|Plate Type|
+----------+
|       AGR|
|       SOS|
|       WUG|
|       AYG|
|       SRN|
|       VPL|
|       HOU|
|       TRL|
|       JWV|
|       SNO|
|       MCD|
|       AMB|
|       MOT|
|       OMV|
|       FPW|
|       LMC|
|       ATV|
|       HSM|
|       IRP|
|       PHS|
+----------+
only showing top 20 rows



In [16]:
plateTypeAvgDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("hdfs://hadoop-namenode:9000/output/plate_type_avg.csv")

                                                                                

Technique 3. Join strategies - broadcast join and bucketed joins

3.1. Broadcast Join

In [18]:
parkViolations_2015 = spark.read.option("header", True).csv("hdfs://hadoop-namenode:9000/input/2015.csv")
parkViolations_2016 = spark.read.option("header", True).csv("hdfs://hadoop-namenode:9000/input/2016.csv")

In [21]:
parkViolations_2015 = parkViolations_2015.withColumnRenamed("Plate Type", "plateType") # simple column rename for easier joins
parkViolations_2016 = parkViolations_2016.withColumnRenamed("Plate Type", "plateType")

In [22]:
parkViolations_2016_COM = parkViolations_2016.filter(parkViolations_2016.plateType == "COM")
parkViolations_2015_COM = parkViolations_2015.filter(parkViolations_2015.plateType == "COM")

In [23]:
joinDF = parkViolations_2015_COM.join(parkViolations_2016_COM, parkViolations_2015_COM.plateType ==  parkViolations_2016_COM.plateType, "inner").select(parkViolations_2015_COM["Summons Number"], parkViolations_2016_COM["Issue Date"])
joinDF.explain() # you will see SortMergeJoin, with exchange for both dataframes, which means involves data shuffle of both dataframe
# The below join will take a very long time with the given infrastructure, do not run, unless needed
# joinDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("/output/joined_df")

== Physical Plan ==
*(5) Project [Summons Number#317, Issue Date#439]
+- *(5) SortMergeJoin [plateType#793], [plateType#845], Inner
   :- *(2) Sort [plateType#793 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(plateType#793, 200), true, [id=#272]
   :     +- *(1) Project [Summons Number#317, Plate Type#320 AS plateType#793]
   :        +- *(1) Filter (isnotnull(Plate Type#320) AND (Plate Type#320 = COM))
   :           +- FileScan csv [Summons Number#317,Plate Type#320] Batched: false, DataFilters: [isnotnull(Plate Type#320), (Plate Type#320 = COM)], Format: CSV, Location: InMemoryFileIndex[hdfs://hadoop-namenode:9000/input/2015.csv], PartitionFilters: [], PushedFilters: [IsNotNull(Plate Type), EqualTo(Plate Type,COM)], ReadSchema: struct<Summons Number:string,Plate Type:string>
   +- *(4) Sort [plateType#845 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(plateType#845, 200), true, [id=#281]
         +- *(3) Project [Plate Type#438 AS plateType#845, Issue 

In [27]:
parkViolations_2015_COM = parkViolations_2015.filter(parkViolations_2015.plateType == "COM").select("plateType", "Summons Number").distinct()

In [28]:
parkViolations_2016_COM = parkViolations_2016.filter(parkViolations_2016.plateType == "COM").select("plateType", "Issue Date").distinct()

In [29]:
parkViolations_2015_COM.cache()
parkViolations_2016_COM.cache()

DataFrame[plateType: string, Issue Date: string]

In [30]:
parkViolations_2015_COM.count() # will cause parkViolations_2015_COM to be cached
parkViolations_2016_COM.count() # will cause parkViolations_2016_COM to be cached

                                                                                

1183

In [31]:
joinDF = parkViolations_2015_COM.join(parkViolations_2016_COM.hint("broadcast"), parkViolations_2015_COM.plateType ==  parkViolations_2016_COM.plateType, "inner").select(parkViolations_2015_COM["Summons Number"], parkViolations_2016_COM["Issue Date"])
joinDF.explain() # you will see BroadcastHashJoin

== Physical Plan ==
*(2) Project [Summons Number#317, Issue Date#439]
+- *(2) BroadcastHashJoin [plateType#793], [plateType#845], Inner, BuildRight
   :- *(2) Filter isnotnull(plateType#793)
   :  +- InMemoryTableScan [plateType#793, Summons Number#317], [isnotnull(plateType#793)]
   :        +- InMemoryRelation [plateType#793, Summons Number#317], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- *(2) HashAggregate(keys=[plateType#793, Summons Number#317], functions=[])
   :                 +- Exchange hashpartitioning(plateType#793, Summons Number#317, 200), true, [id=#370]
   :                    +- *(1) HashAggregate(keys=[plateType#793, Summons Number#317], functions=[])
   :                       +- *(1) Project [Plate Type#320 AS plateType#793, Summons Number#317]
   :                          +- *(1) Filter (isnotnull(Plate Type#320) AND (Plate Type#320 = COM))
   :                             +- FileScan csv [Summons Number#317,Plate Type#320] Batched: 

In [33]:
joinDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("hdfs://hadoop-namenode:9000/output/joined_df")

                                                                                

In [34]:
df_load = spark.read.csv("hdfs://hadoop-namenode:9000/output/joined_df")
print("Sales Dataframe read from Hadoop : ")
df_load.show()

                                                                                

Sales Dataframe read from Hadoop : 
+--------------+----------+
|           _c0|       _c1|
+--------------+----------+
|Summons Number|Issue Date|
|    7779478117|01/08/2019|
|    7779478117|09/02/2014|
|    7779478117|04/28/2018|
|    7779478117|10/16/2010|
|    7779478117|10/31/2016|
|    7779478117|03/17/2017|
|    7779478117|04/06/2015|
|    7779478117|01/30/2015|
|    7779478117|05/24/2015|
|    7779478117|02/24/2010|
|    7779478117|02/27/2016|
|    7779478117|03/01/2016|
|    7779478117|07/08/2017|
|    7779478117|07/22/2016|
|    7779478117|10/29/2015|
|    7779478117|04/26/2018|
|    7779478117|10/27/2017|
|    7779478117|10/30/2019|
|    7779478117|06/08/2017|
+--------------+----------+
only showing top 20 rows



In [35]:
df_load.count()

                                                                                

2593756092

3.2. Bucketed Join

In [54]:
parkViolations_2015 = spark.read.option("header", True).csv("hdfs://hadoop-namenode:9000/input/2015.csv")
parkViolations_2016 = spark.read.option("header", True).csv("hdfs://hadoop-namenode:9000/input/2016.csv")

In [55]:
new_column_name_list= list(map(lambda x: x.replace(" ", "_"), parkViolations_2015.columns))
new_column_name_list

['Summons_Number',
 'Plate_ID',
 'Registration_State',
 'Plate_Type',
 'Issue_Date',
 'Violation_Code',
 'Vehicle_Body_Type',
 'Vehicle_Make',
 'Issuing_Agency',
 'Street_Code1',
 'Street_Code2',
 'Street_Code3',
 'Vehicle_Expiration_Date',
 'Violation_Location',
 'Violation_Precinct',
 'Issuer_Precinct',
 'Issuer_Code',
 'Issuer_Command',
 'Issuer_Squad',
 'Violation_Time',
 'Time_First_Observed',
 'Violation_County',
 'Violation_In_Front_Of_Or_Opposite',
 'House_Number',
 'Street_Name',
 'Intersecting_Street',
 'Date_First_Observed',
 'Law_Section',
 'Sub_Division',
 'Violation_Legal_Code',
 'Days_Parking_In_Effect____',
 'From_Hours_In_Effect',
 'To_Hours_In_Effect',
 'Vehicle_Color',
 'Unregistered_Vehicle?',
 'Vehicle_Year',
 'Meter_Number',
 'Feet_From_Curb',
 'Violation_Post_Code',
 'Violation_Description',
 'No_Standing_or_Stopping_Violation',
 'Hydrant_Violation',
 'Double_Parking_Violation',
 'Latitude',
 'Longitude',
 'Community_Board',
 'Community_Council_',
 'Census_Tract'

In [56]:
parkViolations_2015 = parkViolations_2015.toDF(*new_column_name_list)
parkViolations_2016 = parkViolations_2016.toDF(*new_column_name_list)

In [60]:
parkViolations_2016.groupBy('Plate_Type','Vehicle_Year').avg().show()



+----------+------------+
|Plate_Type|Vehicle_Year|
+----------+------------+
|       COM|        1997|
|       PAS|        1988|
|       THC|        2011|
|       ITP|        2006|
|       TRC|        2007|
|       OMR|        1995|
|       OMR|        1997|
|       PAS|        2036|
|       AMB|        2012|
|       TRA|        1991|
|       NYC|        2005|
|       COM|        2027|
|       LMB|        2012|
|       SOS|        2004|
|       FAR|        2015|
|       COM|        1992|
|       OMR|        2001|
|       SPO|        2004|
|       CMH|        2005|
|       OMS|        2056|
+----------+------------+
only showing top 20 rows



                                                                                

In [61]:

parkViolations_2015 = parkViolations_2015.filter(parkViolations_2015.Plate_Type == "COM").filter(parkViolations_2015.Vehicle_Year == "1997")
parkViolations_2016 = parkViolations_2016.filter(parkViolations_2016.Plate_Type == "COM").filter(parkViolations_2016.Vehicle_Year == "1997")

In [46]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
 # we do this so that Spark does not auto optimize for broadcast join, setting to -1 means disable

In [62]:
parkViolations_2015.write.mode("overwrite").bucketBy(400, "Vehicle_Year", "plate_type").saveAsTable("parkViolations_bkt_2015")
parkViolations_2016.write.mode("overwrite").bucketBy(400, "Vehicle_Year", "plate_type").saveAsTable("parkViolations_bkt_2016")

                                                                                

In [63]:
parkViolations_2015_tbl = spark.read.table("parkViolations_bkt_2015")
parkViolations_2016_tbl = spark.read.table("parkViolations_bkt_2016")

In [64]:
joinDF = parkViolations_2015_tbl.join(parkViolations_2016_tbl, (parkViolations_2015_tbl.Plate_Type ==  parkViolations_2016_tbl.Plate_Type) & (parkViolations_2015_tbl.Vehicle_Year ==  parkViolations_2016_tbl.Vehicle_Year) , "inner").select(parkViolations_2015_tbl["Summons_Number"], parkViolations_2016_tbl["Issue_Date"])

In [65]:
joinDF.explain() # you will see SortMergeJoin, but no exchange, which means no data shuffle


== Physical Plan ==
*(3) Project [Summons_Number#4110, Issue_Date#4216]
+- *(3) SortMergeJoin [Vehicle_Year#4145, Plate_Type#4113], [Vehicle_Year#4247, Plate_Type#4215], Inner
   :- *(1) Sort [Vehicle_Year#4145 ASC NULLS FIRST, Plate_Type#4113 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [Summons_Number#4110, Plate_Type#4113, Vehicle_Year#4145]
   :     +- *(1) Filter (isnotnull(Plate_Type#4113) AND isnotnull(Vehicle_Year#4145))
   :        +- *(1) ColumnarToRow
   :           +- FileScan parquet default.parkviolations_bkt_2015[Summons_Number#4110,Plate_Type#4113,Vehicle_Year#4145] Batched: true, DataFilters: [isnotnull(Plate_Type#4113), isnotnull(Vehicle_Year#4145)], Format: Parquet, Location: InMemoryFileIndex[file:/workspace/spark-warehouse/parkviolations_bkt_2015], PartitionFilters: [], PushedFilters: [IsNotNull(Plate_Type), IsNotNull(Vehicle_Year)], ReadSchema: struct<Summons_Number:string,Plate_Type:string,Vehicle_Year:string>, SelectedBucketsCount: 400 out of 400
   +- *(2) 

In [66]:
# The below join will take a while, approx 30min
joinDF.write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save("hdfs://hadoop-namenode:9000/output/bkt_joined_df.csv")

                                                                                

In [67]:
joinDF_bk_load = spark.read.format("com.databricks.spark.csv").option("header", "true").load("hdfs://hadoop-namenode:9000/output/bkt_joined_df.csv")
print("Sales Dataframe read from Hadoop : ")
joinDF_bk_load.show()

Sales Dataframe read from Hadoop : 
+--------------+----------+
|Summons_Number|Issue_Date|
+--------------+----------+
+--------------+----------+



22/11/26 09:12:05 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
22/11/26 09:12:05 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:716)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:152)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:258)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:168)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$Mess