## Exploring the City of San Francisco public data with Apache Spark 2.0

The SF OpenData project was launched in 2009 and contains hundreds of datasets from the city and county of San Francisco. Open government data has the potential to increase the quality of life for residents, create more efficient government services, better public decisions, and even new local businesses and services.

It was the 4th of July a couple of days ago, so SF residents enjoyed a fireworks show:

%md How did the 4th of July holiday affect demand for Firefighters?

## Introduction to Spark

Spark is a unified processing engine that can analyze big data using SQL, machine learning, graph processing or real time stream analysis:

We will mostly focus on Spark SQL and DataFrames this evening.

Although Spark supports four languages (Scala, Java, Python, R), tonight we will use Python.
Broadly speaking, there are **2 APIs** for interacting with Spark:
- **DataFrames/SQL/Datasets:** general, higher level API for users of Spark
- **RDD:** a lower level API for spark internals and advanced programming

A Spark cluster is made of one Driver and many Executor JVMs (java virtual machines):

%md The Driver sends Tasks to the empty slots on the Executors when work has to be done:

In Databricks Community Edition, everyone gets a local mode cluster, where the Driver and Executor code run in the same JVM. Local mode clusters are typically used for prototyping and learning Spark:

## Introduction to Fire Department Calls for Service

The latest July 6th, 2016 copy of the "Fire Department Calls for Service" data set has been uploaded to S3. You can see the data with the `%fs ls` command:

In [11]:
# File location and type
file_location = "/FileStore/tables/Fire_Department_Calls_for_Service.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

Call Number,Unit ID,Incident Number,Call Type,Call Date,Watch Date,Received DtTm,Entry DtTm,Dispatch DtTm,Response DtTm,On Scene DtTm,Transport DtTm,Hospital DtTm,Call Final Disposition,Available DtTm,Address,City,Zipcode of Incident,Battalion,Station Area,Box,Original Priority,Priority,Final Priority,ALS Unit,Call Type Group,Number of Alarms,Unit Type,Unit sequence in call dispatch,Fire Prevention District,Supervisor District,Neighborhooods - Analysis Boundaries,Location,RowID
200310051,54,20013094,Medical Incident,01/31/2020,01/30/2020,01/31/2020 12:23:17 AM,01/31/2020 12:24:10 AM,01/31/2020 12:24:19 AM,01/31/2020 12:24:26 AM,01/31/2020 12:32:48 AM,01/31/2020 12:48:14 AM,01/31/2020 12:51:51 AM,Code 2 Transport,01/31/2020 01:08:08 AM,TURK ST/HYDE ST,San Francisco,94102,B02,3,1554,2,2,2,True,Non Life-threatening,1,MEDIC,1,2.0,6,Tenderloin,"(37.78258503328092, -122.41569387210927)",200310051-54
200230321,E01,20009609,Medical Incident,01/23/2020,01/22/2020,01/23/2020 04:22:35 AM,01/23/2020 04:23:07 AM,01/23/2020 04:23:23 AM,01/23/2020 04:25:46 AM,01/23/2020 04:28:36 AM,,,Code 2 Transport,01/23/2020 04:35:54 AM,900 Block of MARKET ST,San Francisco,94103,B02,1,2248,3,3,3,True,Potentially Life-Threatening,1,ENGINE,1,3.0,6,South of Market,"(37.78269112134719, -122.40953666269209)",200230321-E01
200390425,KM07,20016682,Medical Incident,02/08/2020,02/07/2020,02/08/2020 04:33:30 AM,02/08/2020 04:35:01 AM,02/08/2020 04:35:41 AM,02/08/2020 04:36:31 AM,,,,Fire,02/08/2020 04:45:02 AM,1100 Block of GREENWICH ST,San Francisco,94109,B01,28,1612,2,3,3,False,Non Life-threatening,1,PRIVATE,3,1.0,2,Russian Hill,"(37.80126641196036, -122.41824712781222)",200390425-KM07
200331972,62,20014211,Traffic Collision,02/02/2020,02/02/2020,02/02/2020 02:36:32 PM,02/02/2020 02:44:10 PM,02/02/2020 02:44:46 PM,02/02/2020 02:45:23 PM,,,,Code 2 Transport,02/02/2020 02:46:01 PM,101 NB Z BAYSHR 3RD/PAUL AV,San Francisco,94134,B10,44,6542,2,2,2,True,Non Life-threatening,1,MEDIC,2,10.0,9,Portola,"(37.72382363385559, -122.40141524801041)",200331972-62
200360712,RS2,20015378,Structure Fire,02/05/2020,02/05/2020,02/05/2020 08:04:35 AM,02/05/2020 08:07:31 AM,02/05/2020 08:07:57 AM,02/05/2020 08:09:19 AM,02/05/2020 08:10:55 AM,,,Fire,02/05/2020 08:15:30 AM,500 Block of CAPP ST,San Francisco,94110,B06,7,5446,3,3,3,False,Alarm,1,RESCUE SQUAD,2,6.0,9,Mission,"(37.7578954432039, -122.41795728711763)",200360712-RS2
200201881,71,20008444,Medical Incident,01/20/2020,01/20/2020,01/20/2020 01:32:12 PM,01/20/2020 01:36:36 PM,01/20/2020 01:36:49 PM,01/20/2020 01:37:36 PM,01/20/2020 01:43:46 PM,,,Gone on Arrival,01/20/2020 01:45:14 PM,500 Block of SUTTER ST,San Francisco,94108,B01,41,1412,2,2,2,True,Potentially Life-Threatening,1,MEDIC,1,1.0,3,Nob Hill,"(37.78917521379033, -122.40942968306595)",200201881-71
200253168,E28,20010794,Medical Incident,01/25/2020,01/25/2020,01/25/2020 07:44:57 PM,01/25/2020 07:47:59 PM,01/25/2020 07:48:07 PM,01/25/2020 07:48:58 PM,01/25/2020 07:51:45 PM,,,Code 2 Transport,01/25/2020 08:05:20 PM,1000 Block of CHESTNUT ST,San Francisco,94109,B01,28,1613,3,3,3,True,Potentially Life-Threatening,1,ENGINE,1,1.0,2,Russian Hill,"(37.802864995991825, -122.42065066827944)",200253168-E28
200371642,B04,20015934,Structure Fire,02/06/2020,02/06/2020,02/06/2020 12:02:46 PM,02/06/2020 12:03:43 PM,02/06/2020 12:03:56 PM,02/06/2020 12:04:50 PM,02/06/2020 12:15:03 PM,,,Fire,02/06/2020 01:12:53 PM,700 Block of 7TH AVE,San Francisco,94118,B07,31,7132,3,3,3,False,Fire,1,CHIEF,12,7.0,1,Inner Richmond,"(37.77440794262095, -122.46480778984457)",200371642-B04
200330127,AM244,20013987,Medical Incident,02/02/2020,02/01/2020,02/02/2020 12:56:48 AM,02/02/2020 12:56:48 AM,02/02/2020 01:16:13 AM,02/02/2020 01:17:46 AM,,,,No Merit,02/02/2020 01:23:40 AM,1100 Block of FOLSOM ST,San Francisco,94103,B03,1,2313,A,2,2,False,Non Life-threatening,1,PRIVATE,4,2.0,6,South of Market,"(37.77562329380876, -122.40916530235312)",200330127-AM244
200372622,E32,20016032,Medical Incident,02/06/2020,02/06/2020,02/06/2020 03:59:52 PM,02/06/2020 04:02:09 PM,02/06/2020 04:02:38 PM,02/06/2020 04:02:59 PM,02/06/2020 04:09:06 PM,,,Code 2 Transport,02/06/2020 04:19:01 PM,200 Block of ADDISON ST,San Francisco,94131,B06,26,8122,2,2,2,True,Non Life-threatening,1,ENGINE,1,6.0,8,Glen Park,"(37.737745740922264, -122.43249810888456)",200372622-E32


Note, you can also access the 1.6 GB of data directly from sfgov.org via this link: https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3

The entry point into all functionality in Spark 2.0 is the new SparkSession class:

In [14]:
spark

Using the SparkSession, create a DataFrame from the CSV file by inferring the schema:

In [16]:
fireServiceCallsDF = spark.read.csv('/mnt/sf_open_data/fire_dept_calls_for_service/Fire_Department_Calls_for_Service.csv', header=True, inferSchema=True)

Notice that the above cell takes ~15 seconds to run b/c it is inferring the schema by sampling the file and reading through it.

Inferring the schema works for ad hoc analysis against smaller datasets. But when working on multi-TB+ data, it's better to provide an **explicit pre-defined schema manually**, so there's no inferring cost:

In [18]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

In [19]:
# Note that we are removing all space characters from the col names to prevent errors when writing to Parquet later

fireSchema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),       
                     StructField('WatchDate', StringType(), True),       
                     StructField('ReceivedDtTm', StringType(), True),       
                     StructField('EntryDtTm', StringType(), True),       
                     StructField('DispatchDtTm', StringType(), True),       
                     StructField('ResponseDtTm', StringType(), True),       
                     StructField('OnSceneDtTm', StringType(), True),       
                     StructField('TransportDtTm', StringType(), True),                  
                     StructField('HospitalDtTm', StringType(), True),       
                     StructField('CallFinalDisposition', StringType(), True),       
                     StructField('AvailableDtTm', StringType(), True),       
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('ZipcodeofIncident', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumberofAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('Unitsequenceincalldispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('NeighborhoodDistrict', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True)])

In [20]:
#Notice that no job is run this time
fireServiceCallsDF = spark.read.csv('/FileStore/tables/Fire_Department_Calls_for_Service.csv', header=True, schema=fireSchema)

Look at the first 5 records in the DataFrame:

In [22]:
display(fireServiceCallsDF.limit(5))

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,ReceivedDtTm,EntryDtTm,DispatchDtTm,ResponseDtTm,OnSceneDtTm,TransportDtTm,HospitalDtTm,CallFinalDisposition,AvailableDtTm,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhoodDistrict,Location,RowID
200310051,54,20013094,Medical Incident,01/31/2020,01/30/2020,01/31/2020 12:23:17 AM,01/31/2020 12:24:10 AM,01/31/2020 12:24:19 AM,01/31/2020 12:24:26 AM,01/31/2020 12:32:48 AM,01/31/2020 12:48:14 AM,01/31/2020 12:51:51 AM,Code 2 Transport,01/31/2020 01:08:08 AM,TURK ST/HYDE ST,San Francisco,94102,B02,3,1554,2,2,2,True,Non Life-threatening,1,MEDIC,1,2,6,Tenderloin,"(37.78258503328092, -122.41569387210927)",200310051-54
200230321,E01,20009609,Medical Incident,01/23/2020,01/22/2020,01/23/2020 04:22:35 AM,01/23/2020 04:23:07 AM,01/23/2020 04:23:23 AM,01/23/2020 04:25:46 AM,01/23/2020 04:28:36 AM,,,Code 2 Transport,01/23/2020 04:35:54 AM,900 Block of MARKET ST,San Francisco,94103,B02,1,2248,3,3,3,True,Potentially Life-Threatening,1,ENGINE,1,3,6,South of Market,"(37.78269112134719, -122.40953666269209)",200230321-E01
200390425,KM07,20016682,Medical Incident,02/08/2020,02/07/2020,02/08/2020 04:33:30 AM,02/08/2020 04:35:01 AM,02/08/2020 04:35:41 AM,02/08/2020 04:36:31 AM,,,,Fire,02/08/2020 04:45:02 AM,1100 Block of GREENWICH ST,San Francisco,94109,B01,28,1612,2,3,3,False,Non Life-threatening,1,PRIVATE,3,1,2,Russian Hill,"(37.80126641196036, -122.41824712781222)",200390425-KM07
200331972,62,20014211,Traffic Collision,02/02/2020,02/02/2020,02/02/2020 02:36:32 PM,02/02/2020 02:44:10 PM,02/02/2020 02:44:46 PM,02/02/2020 02:45:23 PM,,,,Code 2 Transport,02/02/2020 02:46:01 PM,101 NB Z BAYSHR 3RD/PAUL AV,San Francisco,94134,B10,44,6542,2,2,2,True,Non Life-threatening,1,MEDIC,2,10,9,Portola,"(37.72382363385559, -122.40141524801041)",200331972-62
200360712,RS2,20015378,Structure Fire,02/05/2020,02/05/2020,02/05/2020 08:04:35 AM,02/05/2020 08:07:31 AM,02/05/2020 08:07:57 AM,02/05/2020 08:09:19 AM,02/05/2020 08:10:55 AM,,,Fire,02/05/2020 08:15:30 AM,500 Block of CAPP ST,San Francisco,94110,B06,7,5446,3,3,3,False,Alarm,1,RESCUE SQUAD,2,6,9,Mission,"(37.7578954432039, -122.41795728711763)",200360712-RS2


Print just the column names in the DataFrame:

In [24]:
fireServiceCallsDF.columns

Count how many rows total there are in DataFrame (and see how long it takes to do a full scan from remote disk/S3):

In [26]:
fireServiceCallsDF.count()

There are over 4 million rows in the DataFrame and it takes ~14 seconds to do a full read of it.

**Analysis with PySpark DataFrames API**

#### Spark Operations

DataFrames support two types of operations: *transformations* and *actions*.

Transformations, like `select()` or `filter()` create a new DataFrame from an existing one.

Actions, like `show()` or `count()`, return a value with results to the user. Other actions like `save()` write the DataFrame to distributed storage (like S3 or HDFS).

Transformations contribute to a query plan,  but  nothing is executed until an action is called.

**Q-1) How many different types of calls were made to the Fire Department?**

In [33]:
# Use the .select() transformation to yank out just the 'Call Type' column, then call the show action
fireServiceCallsDF.select('CallType').show(5)

In [34]:
# Add the .distinct() transformation to keep only distinct rows
# The False below expands the ASCII column width to fit the full text in the output

fireServiceCallsDF.select('CallType').distinct().show(35, False)

**Q-2) How many incidents of each call type were there?**

In [36]:
#Note that .count() is actually a transformation here

display(fireServiceCallsDF.select('CallType').groupBy('CallType').count().orderBy("count", ascending=False))

CallType,count
Medical Incident,69912
Alarms,11365
Structure Fire,6939
Traffic Collision,3165
Outside Fire,1673
Citizen Assist / Service Call,1419
Other,1392
Gas Leak (Natural and LP Gases),763
Water Rescue,605
Electrical Hazard,382


Seems like the SF Fire department is called for medical incidents far more than any other type. Note that the above command took about 14 seconds to execute. In an upcoming section, we'll cache the data into memory for up to 100x speed increases.

###  ** Doing Date/Time Analysis**

**Q-3) How many years of Fire Service Calls is in the data file?**

Notice that the date or time columns are currently being interpreted as strings, rather than date or time objects:

In [40]:
fireServiceCallsDF.printSchema()

Let's use the unix_timestamp() function to convert the string into a timestamp:

https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/python/pyspark.sql.html?highlight=spark#pyspark.sql.functions.from_unixtime

In [42]:
from pyspark.sql.functions import *

In [43]:
# Note that PySpark uses the Java Simple Date Format patterns

from_pattern1 = 'MM/dd/yyyy'
to_pattern1 = 'yyyy-MM-dd'

from_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'
to_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'


fireServiceCallsTsDF = fireServiceCallsDF \
  .withColumn('CallDateTS', unix_timestamp(fireServiceCallsDF['CallDate'], from_pattern1).cast("timestamp")) \
  .drop('CallDate') \
  .withColumn('WatchDateTS', unix_timestamp(fireServiceCallsDF['WatchDate'], from_pattern1).cast("timestamp")) \
  .drop('WatchDate') \
  .withColumn('ReceivedDtTmTS', unix_timestamp(fireServiceCallsDF['ReceivedDtTm'], from_pattern2).cast("timestamp")) \
  .drop('ReceivedDtTm') \
  .withColumn('EntryDtTmTS', unix_timestamp(fireServiceCallsDF['EntryDtTm'], from_pattern2).cast("timestamp")) \
  .drop('EntryDtTm') \
  .withColumn('DispatchDtTmTS', unix_timestamp(fireServiceCallsDF['DispatchDtTm'], from_pattern2).cast("timestamp")) \
  .drop('DispatchDtTm') \
  .withColumn('ResponseDtTmTS', unix_timestamp(fireServiceCallsDF['ResponseDtTm'], from_pattern2).cast("timestamp")) \
  .drop('ResponseDtTm') \
  .withColumn('OnSceneDtTmTS', unix_timestamp(fireServiceCallsDF['OnSceneDtTm'], from_pattern2).cast("timestamp")) \
  .drop('OnSceneDtTm') \
  .withColumn('TransportDtTmTS', unix_timestamp(fireServiceCallsDF['TransportDtTm'], from_pattern2).cast("timestamp")) \
  .drop('TransportDtTm') \
  .withColumn('HospitalDtTmTS', unix_timestamp(fireServiceCallsDF['HospitalDtTm'], from_pattern2).cast("timestamp")) \
  .drop('HospitalDtTm') \
  .withColumn('AvailableDtTmTS', unix_timestamp(fireServiceCallsDF['AvailableDtTm'], from_pattern2).cast("timestamp")) \
  .drop('AvailableDtTm')  

In [44]:
fireServiceCallsTsDF.printSchema()

Notice that the formatting of the timestamps is now different:

In [46]:
display(fireServiceCallsTsDF.limit(5))

CallNumber,UnitID,IncidentNumber,CallType,CallFinalDisposition,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhoodDistrict,Location,RowID,CallDateTS,WatchDateTS,ReceivedDtTmTS,EntryDtTmTS,DispatchDtTmTS,ResponseDtTmTS,OnSceneDtTmTS,TransportDtTmTS,HospitalDtTmTS,AvailableDtTmTS
200310051,54,20013094,Medical Incident,Code 2 Transport,TURK ST/HYDE ST,San Francisco,94102,B02,3,1554,2,2,2,True,Non Life-threatening,1,MEDIC,1,2,6,Tenderloin,"(37.78258503328092, -122.41569387210927)",200310051-54,2020-01-31T00:00:00.000+0000,2020-01-30T00:00:00.000+0000,2020-01-31T00:23:17.000+0000,2020-01-31T00:24:10.000+0000,2020-01-31T00:24:19.000+0000,2020-01-31T00:24:26.000+0000,2020-01-31T00:32:48.000+0000,2020-01-31T00:48:14.000+0000,2020-01-31T00:51:51.000+0000,2020-01-31T01:08:08.000+0000
200230321,E01,20009609,Medical Incident,Code 2 Transport,900 Block of MARKET ST,San Francisco,94103,B02,1,2248,3,3,3,True,Potentially Life-Threatening,1,ENGINE,1,3,6,South of Market,"(37.78269112134719, -122.40953666269209)",200230321-E01,2020-01-23T00:00:00.000+0000,2020-01-22T00:00:00.000+0000,2020-01-23T04:22:35.000+0000,2020-01-23T04:23:07.000+0000,2020-01-23T04:23:23.000+0000,2020-01-23T04:25:46.000+0000,2020-01-23T04:28:36.000+0000,,,2020-01-23T04:35:54.000+0000
200390425,KM07,20016682,Medical Incident,Fire,1100 Block of GREENWICH ST,San Francisco,94109,B01,28,1612,2,3,3,False,Non Life-threatening,1,PRIVATE,3,1,2,Russian Hill,"(37.80126641196036, -122.41824712781222)",200390425-KM07,2020-02-08T00:00:00.000+0000,2020-02-07T00:00:00.000+0000,2020-02-08T04:33:30.000+0000,2020-02-08T04:35:01.000+0000,2020-02-08T04:35:41.000+0000,2020-02-08T04:36:31.000+0000,,,,2020-02-08T04:45:02.000+0000
200331972,62,20014211,Traffic Collision,Code 2 Transport,101 NB Z BAYSHR 3RD/PAUL AV,San Francisco,94134,B10,44,6542,2,2,2,True,Non Life-threatening,1,MEDIC,2,10,9,Portola,"(37.72382363385559, -122.40141524801041)",200331972-62,2020-02-02T00:00:00.000+0000,2020-02-02T00:00:00.000+0000,2020-02-02T14:36:32.000+0000,2020-02-02T14:44:10.000+0000,2020-02-02T14:44:46.000+0000,2020-02-02T14:45:23.000+0000,,,,2020-02-02T14:46:01.000+0000
200360712,RS2,20015378,Structure Fire,Fire,500 Block of CAPP ST,San Francisco,94110,B06,7,5446,3,3,3,False,Alarm,1,RESCUE SQUAD,2,6,9,Mission,"(37.7578954432039, -122.41795728711763)",200360712-RS2,2020-02-05T00:00:00.000+0000,2020-02-05T00:00:00.000+0000,2020-02-05T08:04:35.000+0000,2020-02-05T08:07:31.000+0000,2020-02-05T08:07:57.000+0000,2020-02-05T08:09:19.000+0000,2020-02-05T08:10:55.000+0000,,,2020-02-05T08:15:30.000+0000


Finally calculate how many distinct years of data is in the CSV file:

In [48]:
fireServiceCallsTsDF.select(year('CallDateTS')).distinct().orderBy('year(CallDateTS)').show()

**Q-4) How many service calls were logged in the past 7 days?**

Note that today, July 6th, is the 187th day of the year.

Filter the DF down to just 2016 and days of year greater than 180:

In [51]:
fireServiceCallsTsDF.filter(year('CallDateTS') == '2020').filter(dayofyear('CallDateTS') >= 50).select(dayofyear('CallDateTS')).distinct().orderBy('dayofyear(CallDateTS)').show()

In [52]:
fireServiceCallsTsDF.filter(year('CallDateTS') == '2020').filter(dayofyear('CallDateTS') >= 50).groupBy(dayofyear('CallDateTS')).count().orderBy('dayofyear(CallDateTS)').show()

Note above that July 4th, 2016 was the 185th day of the year.

Visualize the results in a bar graph:

In [55]:
display(fireServiceCallsTsDF.filter(year('CallDateTS') == '2020').filter(dayofyear('CallDateTS') >= 50).groupBy(dayofyear('CallDateTS')).count().orderBy('dayofyear(CallDateTS)'))

dayofyear(CallDateTS),count
50,813
51,961
52,920
53,954
54,883
55,925
56,894
57,932
58,953
59,939


### ** Memory, Caching and write to Parquet**

The DataFrame is currently comprised of 13 partitions:

In [58]:
fireServiceCallsTsDF.rdd.getNumPartitions()

In [59]:
fireServiceCallsTsDF.repartition(6).createOrReplaceTempView("fireServiceVIEW");

In [60]:
spark.catalog.cacheTable("fireServiceVIEW")

In [61]:
# Call .count() to materialize the cache
spark.table("fireServiceVIEW").count()

In [62]:
fireServiceDF = spark.table("fireServiceVIEW")

In [63]:
# Note that the full scan + count in memory takes < 1 second!

fireServiceDF.count()

In [64]:
spark.catalog.isCached("fireServiceVIEW")

The 6 partitions are now cached in memory:

Use the Spark UI to see the 6 partitions in memory:

Now that our data has the correct date types for each column and it is correctly partitioned, let's write it down as a parquet file for future loading:

In [68]:
%fs ls /tmp/

path,name,size
dbfs:/tmp/co-est2019-alldata.csv,co-est2019-alldata.csv,3644730
dbfs:/tmp/engr.alifiaz@gmail.com/,engr.alifiaz@gmail.com/,0
dbfs:/tmp/hive/,hive/,0


In [69]:
fireServiceDF.write.format('parquet').save('/tmp/fireServiceParquet/')

Now the directory should contain 6 .gz compressed Parquet files (one for each partition):

In [71]:
%fs ls /tmp/fireServiceParquet/

path,name,size
dbfs:/tmp/fireServiceParquet/_SUCCESS,_SUCCESS,0
dbfs:/tmp/fireServiceParquet/_committed_1063084419995914434,_committed_1063084419995914434,630
dbfs:/tmp/fireServiceParquet/_started_1063084419995914434,_started_1063084419995914434,0
dbfs:/tmp/fireServiceParquet/part-00000-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1502-1-c000.snappy.parquet,part-00000-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1502-1-c000.snappy.parquet,1757876
dbfs:/tmp/fireServiceParquet/part-00001-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1503-1-c000.snappy.parquet,part-00001-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1503-1-c000.snappy.parquet,1752436
dbfs:/tmp/fireServiceParquet/part-00002-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1504-1-c000.snappy.parquet,part-00002-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1504-1-c000.snappy.parquet,1755739
dbfs:/tmp/fireServiceParquet/part-00003-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1505-1-c000.snappy.parquet,part-00003-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1505-1-c000.snappy.parquet,1752158
dbfs:/tmp/fireServiceParquet/part-00004-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1506-1-c000.snappy.parquet,part-00004-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1506-1-c000.snappy.parquet,1753229
dbfs:/tmp/fireServiceParquet/part-00005-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1507-1-c000.snappy.parquet,part-00005-tid-1063084419995914434-ddfe9859-feb1-4420-b094-69f9777ee4e4-1507-1-c000.snappy.parquet,1751775


Here's how you can easily read the parquet file from S3 in the future:

In [73]:
tempDF = spark.read.parquet('/tmp/fireServiceParquet/')

In [74]:
display(tempDF.limit(2))

CallNumber,UnitID,IncidentNumber,CallType,CallFinalDisposition,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhoodDistrict,Location,RowID,CallDateTS,WatchDateTS,ReceivedDtTmTS,EntryDtTmTS,DispatchDtTmTS,ResponseDtTmTS,OnSceneDtTmTS,TransportDtTmTS,HospitalDtTmTS,AvailableDtTmTS
200243456,T01,20010366,Alarms,Fire,800 Block of HOWARD ST,San Francisco,94103,B03,1,2245,3,3,3,False,Alarm,1,TRUCK,3,3,6,South of Market,"(37.78241828644194, -122.40386961153472)",200243456-T01,2020-01-24T00:00:00.000+0000,2020-01-24T00:00:00.000+0000,2020-01-24T20:14:36.000+0000,2020-01-24T20:15:53.000+0000,2020-01-24T20:16:03.000+0000,,,,,2020-01-24T20:49:51.000+0000
200203483,AM122,20008601,Medical Incident,Code 2 Transport,600 Block of BRYANT ST,San Francisco,94107,B03,8,2242,2,2,2,False,Potentially Life-Threatening,1,PRIVATE,1,3,6,South of Market,"(37.77872189500361, -122.39922768262916)",200203483-AM122,2020-01-20T00:00:00.000+0000,2020-01-20T00:00:00.000+0000,2020-01-20T20:23:46.000+0000,2020-01-20T20:26:27.000+0000,2020-01-20T20:26:46.000+0000,2020-01-20T20:28:04.000+0000,2020-01-20T20:50:11.000+0000,2020-01-20T21:04:48.000+0000,2020-01-20T21:16:03.000+0000,2020-01-20T21:41:07.000+0000


Did you know that the new vectorized Parquet decoder in Spark 2.0 has improved Parquet scan throughput by 3x?

###**SQL Queries**

In [77]:
%sql SELECT count(*) FROM fireServiceVIEW;

count(1)
98733


Explain the 'Spark Jobs' in the cell above to see that 7 tasks were launched to run the count... 6 tasks to reach the data from each of the 6 partitions and do a pre-aggregation on each partition, then a final task to aggregate the count from all 6 tasks:

You can use the Spark Stages UI to see the 6 tasks launched in the middle stage:

**Q-5) Which neighborhood in SF generated the most calls last year?**

In [81]:
%sql
SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count
FROM fireServiceVIEW
WHERE year(`CallDateTS`) == '2020'
GROUP BY `NeighborhoodDistrict`
ORDER BY Neighborhood_Count DESC
LIMIT 15;

NeighborhoodDistrict,Neighborhood_Count
Tenderloin,14821
South of Market,11161
Mission,8751
Financial District/South Beach,5969
Bayview Hunters Point,5596
Sunset/Parkside,3720
Western Addition,3521
Nob Hill,3077
Hayes Valley,2503
Outer Richmond,2443


Expand the Spark Job details in the cell above and notice that the last stage uses 200 partitions!

This is default is non-optimal, given that we only have ~1.6 GB of data and 3 slots.

Change the shuffle.partitions option to 6:

In [83]:
spark.conf.get("spark.sql.shuffle.partitions")

In [84]:
spark.conf.set("spark.sql.shuffle.partitions", 6)

In [85]:
spark.conf.get("spark.sql.shuffle.partitions")

Re-run the same SQL query and notice the speed increase:

In [87]:
%sql SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2020' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15;

NeighborhoodDistrict,Neighborhood_Count
Tenderloin,14821
South of Market,11161
Mission,8751
Financial District/South Beach,5969
Bayview Hunters Point,5596
Sunset/Parkside,3720
Western Addition,3521
Nob Hill,3077
Hayes Valley,2503
Outer Richmond,2443


SQL also has some handy commands like `DESC` (describe) to see the schema + data types for the table:

In [89]:
%sql DESC fireServiceVIEW;

col_name,data_type,comment
CallNumber,int,
UnitID,string,
IncidentNumber,int,
CallType,string,
CallFinalDisposition,string,
Address,string,
City,string,
ZipcodeofIncident,int,
Battalion,string,
StationArea,string,


###** Spark Internals and SQL UI**

In [91]:
# Note that a SQL Query just returns back a DataFrame
spark.sql("SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2020' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15")

The `explain()` method can be called on a DataFrame to understand its logical + physical plans:

In [93]:
spark.sql("SELECT `NeighborhoodDistrict`, count(`NeighborhoodDistrict`) AS Neighborhood_Count FROM fireServiceVIEW WHERE year(`CallDateTS`) == '2020' GROUP BY `NeighborhoodDistrict` ORDER BY Neighborhood_Count DESC LIMIT 15").explain(True)

You can view the visual representation of the SQL Query plan from the Spark UI:

![SQL Plan](http://curriculum-release.s3-website-us-west-2.amazonaws.com/sf_open_data_meetup/sql_query_plan.png)

###** DataFrame Joins**

**Q-6) What was the primary non-medical reason most people called the fire department from the Tenderloin last year?**

The "Fire Incidents" data includes a summary of each (non-medical) incident to which the SF Fire Department responded.

Let's do a join to the Fire Incidents data on the "Incident Number" column:

https://data.sfgov.org/Public-Safety/Fire-Incidents/wr8u-xric

Read the Fire Incidents CSV file into a DataFrame:

In [101]:
incidentsDF = spark.read.csv('/mnt/sf_open_data/fire_incidents/Fire_Incidents.csv', header=True, inferSchema=True).withColumnRenamed('Incident Number', 'IncidentNumber').cache()

In [102]:
incidentsDF.printSchema()

In [103]:
# Materialize the cache
incidentsDF.count()

In [104]:
display(incidentsDF.limit(3))

In [105]:
joinedDF = fireServiceDF.join(incidentsDF, fireServiceDF.IncidentNumber == incidentsDF.IncidentNumber)

In [106]:
display(joinedDF.limit(3))

In [107]:
#Note that the joined DF is only 1.1 million rows b/c we did an inner join (the original Fire Service Calls data had 4+ million rows)
joinedDF.count()

In [108]:
joinedDF.filter(year('CallDateTS') == '2015').filter(col('NeighborhoodDistrict') == 'Tenderloin').count()

In [109]:
display(joinedDF.filter(year('CallDateTS') == '2015').filter(col('NeighborhoodDistrict') == 'Tenderloin').groupBy('Primary Situation').count().orderBy(desc("count")).limit(10))

Most of the calls were False Alarms!

What do residents of Russian Hill call the fire department for?

In [112]:
display(joinedDF.filter(year('CallDateTS') == '2015').filter(col('NeighborhoodDistrict') == 'Russian Hill').groupBy('Primary Situation').count().orderBy(desc("count")).limit(10))

### ** Convert a Spark DataFrame to a Pandas DataFrame **

In [114]:
import pandas as pd

In [115]:
pandas2016DF = joinedDF.filter(year('CallDateTS') == '2016').toPandas()

In [116]:
pandas2016DF.dtypes

In [117]:
pandas2016DF.head()

In [118]:
pandas2016DF.describe()

### ** Keep Hacking! **