This notebook goes through an Apache Spark tutorial to load and wrangle data from San Francisco's open datasets
This example will particularly utilize the "fire department calls for service" dataset containing over 4.5 million rows starting from the year 2000 all the way upto 2020. the dataset can be downloaded from the following link:

https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3

In [1]:
sc

In [2]:
# Imports
import spark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType
from pyspark.sql.functions import *

In [3]:
# Setting schema
firecalls_schema = StructType([StructField('CallNumber', IntegerType(), True),
                               StructField('UnitID', StringType(), True),
                               StructField('IncidentNumber', IntegerType(), True),
                               StructField('CallType', StringType(), True),
                               StructField('CallDate', StringType(), True),
                               StructField('WatchDate', StringType(), True),
                               StructField('ReceivedDtTm', StringType(), True),
                               StructField('EntryDtTm', StringType(), True),
                               StructField('DispatchDtTm', StringType(), True),
                               StructField('ResponseDtTm', StringType(), True),
                               StructField('OnSceneDtTm', StringType(), True),
                               StructField('TransportDtTm', StringType(), True),
                               StructField('HospitalDtTm', StringType(), True),
                               StructField('CallFinalDisposition', StringType(), True),
                               StructField('AvailableDtTm', StringType(), True),
                               StructField('Address', StringType(), True),
                               StructField('City', StringType(), True),
                               StructField('ZipcodeofIncident', IntegerType(), True),
                               StructField('Battalion', StringType(), True),
                               StructField('StationArea', StringType(), True),
                               StructField('Box', StringType(), True),
                               StructField('OriginalPriority', StringType(), True),
                               StructField('Priority', StringType(), True),
                               StructField('FinalPriority', IntegerType(), True),
                               StructField('ALSUnit', BooleanType(), True),
                               StructField('CallTypeGroup', StringType(), True),
                               StructField('NumberofAlarms', IntegerType(), True),
                               StructField('UnitType', StringType(), True),
                               StructField('Unitsequenceincalldispatch', IntegerType(), True),
                               StructField('FirePreventionDistrict', StringType(), True),
                               StructField('SupervisorDistrict', StringType(), True),
                               StructField('NeighborhoodDistrict', StringType(), True),
                               StructField('Location', StringType(), True),
                               StructField('RowID', StringType(), True),
    
]
)

In [4]:
# Creating Spark Session and reading CSV file
spark = SparkSession \
    .builder \
    .appName("Spark SFFireDepartmentCalls") \
    .getOrCreate()

df = spark.read.csv("Fire_Department_Calls_for_Service.csv", header=True, schema=firecalls_schema);

In [5]:
# View dataframe
df.show()

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----------------+---------+-----------+----+----------------+--------+-------------+-------+--------------------+--------------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+--------------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|        ReceivedDtTm|           EntryDtTm|        DispatchDtTm|        ResponseDtTm|         OnSceneDtTm|       TransportDtTm|        HospitalDtTm|CallFinalDisposition|       AvailableDtTm|             Address|         City|ZipcodeofIncident|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|       CallTypeGroup|NumberofAlarms|UnitType|Uni

In [6]:
# Printing all columns in the dataframe
df.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallDate',
 'WatchDate',
 'ReceivedDtTm',
 'EntryDtTm',
 'DispatchDtTm',
 'ResponseDtTm',
 'OnSceneDtTm',
 'TransportDtTm',
 'HospitalDtTm',
 'CallFinalDisposition',
 'AvailableDtTm',
 'Address',
 'City',
 'ZipcodeofIncident',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumberofAlarms',
 'UnitType',
 'Unitsequenceincalldispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'NeighborhoodDistrict',
 'Location',
 'RowID']

In [7]:
# Getting tope 35 distinct call types in the dataframe
# 'False' argument expands column width to include the full text
df.select('CallType').distinct().show(35, False)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Lightning Strike (Investigation)            |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Train / Rail Fire                           |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire

In [8]:
# Applying Spark transformations to get calls by highest count 
df.select('CallType').groupBy('CallType').count().orderBy('count', ascending=False).show()

+--------------------+-------+
|            CallType|  count|
+--------------------+-------+
|    Medical Incident|3384315|
|      Structure Fire| 657136|
|              Alarms| 560765|
|   Traffic Collision| 213380|
|               Other|  82028|
|Citizen Assist / ...|  77537|
|        Outside Fire|  62111|
|        Water Rescue|  25295|
|        Vehicle Fire|  24384|
|Gas Leak (Natural...|  20655|
|   Electrical Hazard|  15335|
|Elevator / Escala...|  13993|
|Odor (Strange / U...|  12777|
|Smoke Investigati...|  11553|
|          Fuel Spill|   5960|
|              HazMat|   4093|
|Industrial Accidents|   2967|
|           Explosion|   2736|
|  Aircraft Emergency|   1511|
|       Assist Police|   1382|
+--------------------+-------+
only showing top 20 rows



In [9]:
df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- ReceivedDtTm: string (nullable = true)
 |-- EntryDtTm: string (nullable = true)
 |-- DispatchDtTm: string (nullable = true)
 |-- ResponseDtTm: string (nullable = true)
 |-- OnSceneDtTm: string (nullable = true)
 |-- TransportDtTm: string (nullable = true)
 |-- HospitalDtTm: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeofIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPr

In [10]:
# Spark uses Java simple date format
# Converting 'Date' and 'DateTime' columns from String to DateTime format

from_pattern1 = 'MM/dd/yyyy'
to_pattern1 = 'yyyy-MM-dd'

from_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'
to_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'

df = df\
    .withColumn('CallDateTS', unix_timestamp(df['CallDate'], from_pattern1).cast('timestamp'))\
    .drop('CallDate')\
    .withColumn('WatchDateTS', unix_timestamp(df['WatchDate'], from_pattern1).cast('timestamp'))\
    .drop('WatchDate')\
    .withColumn('ReceivedDtTmTS', unix_timestamp(df['ReceivedDtTm'], from_pattern2).cast('timestamp'))\
    .drop('ReceivedDtTm')\
    .withColumn('EntryDtTmTS', unix_timestamp(df['EntryDtTm'], from_pattern2).cast('timestamp'))\
    .drop('EntryDtTm')\
    .withColumn('DispatchDtTmTS', unix_timestamp(df['DispatchDtTm'], from_pattern2).cast('timestamp'))\
    .drop('DispatchDtTm')\
    .withColumn('ResponseDtTmTS', unix_timestamp(df['ResponseDtTm'], from_pattern2).cast('timestamp'))\
    .drop('ResponseDtTm')\
    .withColumn('OnSceneDtTmTS', unix_timestamp(df['OnSceneDtTm'], from_pattern2).cast('timestamp'))\
    .drop('OnSceneDtTm')\
    .withColumn('TransportDtTmTS', unix_timestamp(df['TransportDtTm'], from_pattern2).cast('timestamp'))\
    .drop('TransportDtTm')\
    .withColumn('HospitalDtTmTS', unix_timestamp(df['HospitalDtTm'], from_pattern2).cast('timestamp'))\
    .drop('HospitalDtTm')\
    .withColumn('AvailableDtTmTS', unix_timestamp(df['AvailableDtTm'], from_pattern2).cast('timestamp'))\
    .drop('AvailableDtTm')\


In [11]:
df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeofIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumberofAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- Unitsequenceincalldispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- NeighborhoodDistrict: string (nullable = true)
 |-- Locat

In [12]:
# Getting all the distinct years in the dataframe
df.select(year('CallDateTS')).distinct().orderBy('year(CallDateTS)').show()

+----------------+
|year(CallDateTS)|
+----------------+
|            2000|
|            2001|
|            2002|
|            2003|
|            2004|
|            2005|
|            2006|
|            2007|
|            2008|
|            2009|
|            2010|
|            2011|
|            2012|
|            2013|
|            2014|
|            2015|
|            2016|
|            2017|
|            2018|
|            2019|
+----------------+
only showing top 20 rows



In [13]:
# Filtering down days to the first ten days in the July of 2019
df.filter(year('CallDateTS') == '2019')\
.filter(dayofyear('CallDateTS') >= 180)\
.filter(dayofyear('CallDateTS') <= 190)\
.select(dayofyear('CallDateTS'))\
.distinct()\
.orderBy('dayofyear(CallDateTS)')\
.show()

+---------------------+
|dayofyear(CallDateTS)|
+---------------------+
|                  180|
|                  181|
|                  182|
|                  183|
|                  184|
|                  185|
|                  186|
|                  187|
|                  188|
|                  189|
|                  190|
+---------------------+



In [14]:
# Coounting number of calls per day in July 2019
df.filter(year('CallDateTS') == '2019').filter(dayofyear('CallDateTS') >= 180)\
.filter(dayofyear('CallDateTS') <= 190)\
.groupBy(dayofyear('CallDateTS'))\
.count()\
.orderBy('dayofyear(CallDateTS)')\
.show()

+---------------------+-----+
|dayofyear(CallDateTS)|count|
+---------------------+-----+
|                  180|  899|
|                  181|  917|
|                  182|  816|
|                  183|  763|
|                  184|  770|
|                  185|  905|
|                  186|  850|
|                  187|  740|
|                  188|  714|
|                  189|  898|
|                  190|  822|
+---------------------+-----+



In [15]:
display(df.filter(year('CallDateTS') == '2019').filter(dayofyear('CallDateTS') >= 180)\
.filter(dayofyear('CallDateTS') <= 190)\
.groupBy(dayofyear('CallDateTS'))\
.count()\
.orderBy('dayofyear(CallDateTS)'))

DataFrame[dayofyear(CallDateTS): int, count: bigint]

In [16]:
df.rdd.getNumPartitions()

15

In [17]:
df.repartition(6).createOrReplaceTempView("fireCallsView")