In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession\
        .builder\
        .appName("Fire_Department_Data")\
        .getOrCreate()

In [4]:
df = spark.read.csv(path = '/content/drive/My\ Drive/Colab\ Datasets/Fire_Department_Calls_for_Service.csv', header=True, inferSchema=True)

In [5]:
df.count()

5391169

In [6]:
df.printSchema()

root
 |-- Call Number: integer (nullable = true)
 |-- Unit ID: string (nullable = true)
 |-- Incident Number: integer (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- Call Date: string (nullable = true)
 |-- Watch Date: string (nullable = true)
 |-- Received DtTm: string (nullable = true)
 |-- Entry DtTm: string (nullable = true)
 |-- Dispatch DtTm: string (nullable = true)
 |-- Response DtTm: string (nullable = true)
 |-- On Scene DtTm: string (nullable = true)
 |-- Transport DtTm: string (nullable = true)
 |-- Hospital DtTm: string (nullable = true)
 |-- Call Final Disposition: string (nullable = true)
 |-- Available DtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode of Incident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Original Priority: string (nullable = true)
 |-- Priority: string (nullable

In [7]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

In [8]:
fireSchema = StructType([
StructField('CallNumber', IntegerType(), True),
StructField('UnitID', StringType(), True),
StructField('IncidentNumber', IntegerType(), True),
StructField('CallType', StringType(), True),
StructField('CallDate', StringType(), True),
StructField('WatchDate', StringType(), True),
StructField('ReceivedDtTm', StringType(), True),
StructField('EntryDtTm', StringType(), True),
StructField('DispatchDtTm', StringType(), True),
StructField('ResponseDtTm', StringType(), True),
StructField('OnSceneDtTm', StringType(), True),
StructField('TransportDtTm', StringType(), True),
StructField('HospitalDtTm', StringType(), True),
StructField('CallFinalDisposition', StringType(), True),
StructField('AvailableDtTm', StringType(), True),
StructField('Address', StringType(), True),
StructField('City', StringType(), True),
StructField('ZipcodeofIncident', IntegerType(), True),
StructField('Battalion', StringType(), True),
StructField('StationArea', StringType(), True),
StructField('Box', StringType(), True),
StructField('OriginalPriority', StringType(), True),
StructField('Priority', StringType(), True),
StructField('FinalPriority', IntegerType(), True),
StructField('ALSUnit', BooleanType(), True),
StructField('CallTypeGroup', StringType(), True),
StructField('NumberofAlarms', IntegerType(), True),
StructField('UnitType', StringType(), True),
StructField('Unitsequenceincalldispatch', IntegerType(), True),
StructField('FirePreventionDistrict', StringType(), True),
StructField('SupervisorDistrict', StringType(), True),
StructField('Neighborhooods-AnalysisBoundaries', StringType(), True),
StructField('Location', StringType(), True),
StructField('RowID', StringType(), True),
StructField('shape', StringType(), True),
StructField('SupervisorDistricts', IntegerType(), True),
StructField('FirePreventionDistricts', IntegerType(), True),
StructField('CurrentPoliceDistricts', IntegerType(), True),
StructField('Neighborhoods-AnalysisBoundaries', IntegerType(), True),
StructField('ZipCodes', IntegerType(), True),
StructField('Neighborhoods(old)', IntegerType(), True),
StructField('PoliceDistricts', IntegerType(), True),
StructField('CivicCenterHarmReductionProjectBoundary', IntegerType(), True),
StructField('HSOCZones', IntegerType(), True),
StructField('CentralMarket/TenderloinBoundaryPolygon-Updated', IntegerType(), True),
StructField('Neighborhoods', IntegerType(), True),
StructField('SFFindNeighborhoods', IntegerType(), True),
StructField('CurrentPoliceDistricts2', IntegerType(), True),
StructField('CurrentSupervisorDistricts', IntegerType(), True)
])

In [9]:
fireServiceCallsDF = spark.read.csv(path = '/content/drive/My\ Drive/Colab\ Datasets/Fire_Department_Calls_for_Service.csv', \
                               header=True, \
                               schema=fireSchema)

In [10]:
fireServiceCallsDF.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- ReceivedDtTm: string (nullable = true)
 |-- EntryDtTm: string (nullable = true)
 |-- DispatchDtTm: string (nullable = true)
 |-- ResponseDtTm: string (nullable = true)
 |-- OnSceneDtTm: string (nullable = true)
 |-- TransportDtTm: string (nullable = true)
 |-- HospitalDtTm: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeofIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPr

In [11]:
fireServiceCallsDF.show(5, False)

+----------+------+--------------+----------------+----------+----------+----------------------+----------------------+----------------------+----------------------+----------------------+-------------+------------+--------------------------+----------------------+-------------------------+----+-----------------+---------+-----------+----+----------------+--------+-------------+-------+-------------+--------------+--------------+--------------------------+----------------------+------------------+---------------------------------+------------------------------------+-------------+-----------------------------------------+-------------------+-----------------------+----------------------+--------------------------------+--------+------------------+---------------+---------------------------------------+---------+-----------------------------------------------+-------------+-------------------+-----------------------+--------------------------+
|CallNumber|UnitID|IncidentNumber|CallType 

In [12]:
#How many different types of calls were made to the Fire Department
fireServiceCallsDF.select('CallType').distinct().show(10, False)

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Lightning Strike (Investigation)   |
|Citizen Assist / Service Call      |
|HazMat                             |
+-----------------------------------+
only showing top 10 rows



In [13]:
#How many incidents of each call type were made to the Fire Department
fireServiceCallsDF.select('CallType').groupBy('CallType').count().orderBy('count',ascending = False).show(10,False)

+-------------------------------+-------+
|CallType                       |count  |
+-------------------------------+-------+
|Medical Incident               |3523134|
|Structure Fire                 |672942 |
|Alarms                         |585670 |
|Traffic Collision              |220617 |
|Other                          |85338  |
|Citizen Assist / Service Call  |80537  |
|Outside Fire                   |66251  |
|Water Rescue                   |27154  |
|Vehicle Fire                   |25042  |
|Gas Leak (Natural and LP Gases)|22129  |
+-------------------------------+-------+
only showing top 10 rows



In [14]:
from pyspark.sql.functions import *

In [15]:
# Change the Date Fields format to convert to Timestamp

from_pattern1 = 'MM/dd/yyyy'
# to_pattern1 = 'yyyy-MM-dd'

from_pattern2 = 'MM/dd/yyyy HH:mm:ss a'
# to_pattern2 = 'MM/dd/yyyy HH:mm:ss.SS'

fireServiceCallsTsDf = fireServiceCallsDF \
                      .withColumn('CallDateTS', unix_timestamp(fireServiceCallsDF['CallDate'],from_pattern1).cast("timestamp")) \
                      .drop('CallDate') \
                      .withColumn('WatchDateTS', unix_timestamp(fireServiceCallsDF['WatchDate'],from_pattern1).cast("timestamp")) \
                      .drop('WatchDate') \
                      .withColumn('ReceivedDtTmTS', unix_timestamp(fireServiceCallsDF['ReceivedDtTm'],from_pattern2).cast("timestamp")) \
                      .drop('ReceivedDtTm') \
                      .withColumn('EntryDtTmTS', unix_timestamp(fireServiceCallsDF['EntryDtTm'],from_pattern2).cast("timestamp")) \
                      .drop('EntryDtTm') \
                      .withColumn('DispatchDtTmTS', unix_timestamp(fireServiceCallsDF['DispatchDtTm'],from_pattern2).cast("timestamp")) \
                      .drop('DispatchDtTm') \
                      .withColumn('ResponseDtTmTS', unix_timestamp(fireServiceCallsDF['ResponseDtTm'],from_pattern2).cast("timestamp")) \
                      .drop('ResponseDtTm') \
                      .withColumn('OnSceneDtTmTS', unix_timestamp(fireServiceCallsDF['OnSceneDtTm'],from_pattern2).cast("timestamp")) \
                      .drop('OnSceneDtTm') \
                      .withColumn('TransportDtTmTS', unix_timestamp(fireServiceCallsDF['TransportDtTm'],from_pattern2).cast("timestamp")) \
                      .drop('TransportDtTm') \
                      .withColumn('HospitalDtTmTS', unix_timestamp(fireServiceCallsDF['HospitalDtTm'],from_pattern2).cast("timestamp")) \
                      .drop('HospitalDtTm') \
                      .withColumn('AvailableDtTmTS', unix_timestamp(fireServiceCallsDF['AvailableDtTm'],from_pattern2).cast("timestamp")) \
                      .drop('AvailableDtTm')

In [16]:
fireServiceCallsTsDf.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeofIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumberofAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- Unitsequenceincalldispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhooods-AnalysisBoundaries: string (nullable = tru

In [17]:
fireServiceCallsTsDf.show()

+----------+------+--------------+--------------------+--------------------+--------------------+----+-----------------+---------+-----------+----+----------------+--------+-------------+-------+-------------+--------------+--------------+--------------------------+----------------------+------------------+---------------------------------+--------------------+-------------+--------------------+-------------------+-----------------------+----------------------+--------------------------------+--------+------------------+---------------+---------------------------------------+---------+-----------------------------------------------+-------------+-------------------+-----------------------+--------------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|            CallType|CallFinalDispositio

In [18]:
#Distinct years in the dataset
fireServiceCallsTsDf.select(year('CallDateTS')).distinct().orderBy(year('CallDateTS')).alias('Year').show(25)

+----------------+
|year(CallDateTS)|
+----------------+
|            2000|
|            2001|
|            2002|
|            2003|
|            2004|
|            2005|
|            2006|
|            2007|
|            2008|
|            2009|
|            2010|
|            2011|
|            2012|
|            2013|
|            2014|
|            2015|
|            2016|
|            2017|
|            2018|
|            2019|
|            2020|
+----------------+



In [19]:
# No. of Service Calls that were logged in the past 7 days

fireServiceCallsTsDf.filter(year('CallDateTS') == '2020') \
                    .filter(dayofyear('CallDateTS') >= 282) \
                    .groupBy('CallType') \
                    .count() \
                    .orderBy('count', ascending = False) \
                    .show(20, False)

+-------------------------------+-----+
|CallType                       |count|
+-------------------------------+-----+
|Medical Incident               |2127 |
|Alarms                         |428  |
|Structure Fire                 |189  |
|Traffic Collision              |158  |
|Outside Fire                   |59   |
|Gas Leak (Natural and LP Gases)|42   |
|Citizen Assist / Service Call  |42   |
|Other                          |41   |
|Water Rescue                   |21   |
|Electrical Hazard              |16   |
|Smoke Investigation (Outside)  |13   |
|Elevator / Escalator Rescue    |7    |
|Fuel Spill                     |6    |
|Suspicious Package             |5    |
|Odor (Strange / Unknown)       |2    |
|Vehicle Fire                   |2    |
+-------------------------------+-----+



In [20]:
# Get the number of partitions
fireServiceCallsTsDf.rdd.getNumPartitions()

18

In [21]:
# create sql view for the dataframe
fireServiceCallsTsDf.createOrReplaceTempView("FireServiceView")

In [22]:
# spark.catalog.cacheTable("FireServiceView")

In [23]:
# Get the total number of records.
fire_sql = spark.sql("SELECT count(*) FROM FireServiceView")

In [24]:
fire_sql.show()

+--------+
|count(1)|
+--------+
| 5391169|
+--------+



In [25]:
# Which neighborhood in SF generated the most calls last year
sf_calls_sql = spark.sql("SELECT `Neighborhooods-AnalysisBoundaries`, count(`Neighborhooods-AnalysisBoundaries`) as neighborhood_call_count \
                          FROM FireServiceView \
                          WHERE year(`CallDateTS`) = '2019' \
                          GROUP BY `Neighborhooods-AnalysisBoundaries` \
                          ORDER BY neighborhood_call_count DESC \
                          LIMIT 20 \
                          ")

In [26]:
sf_calls_sql.show(20,False)

+---------------------------------+-----------------------+
|Neighborhooods-AnalysisBoundaries|neighborhood_call_count|
+---------------------------------+-----------------------+
|Tenderloin                       |45852                  |
|South of Market                  |35007                  |
|Mission                          |28763                  |
|Financial District/South Beach   |22652                  |
|Bayview Hunters Point            |17008                  |
|Sunset/Parkside                  |11294                  |
|Western Addition                 |10627                  |
|Nob Hill                         |10542                  |
|Hayes Valley                     |7940                   |
|Castro/Upper Market              |7902                   |
|Outer Richmond                   |7661                   |
|West of Twin Peaks               |7052                   |
|Pacific Heights                  |6709                   |
|Marina                           |6417 

In [27]:
sf_calls_sql = spark.sql("SELECT `Neighborhooods-AnalysisBoundaries`, count(`Neighborhooods-AnalysisBoundaries`) as neighborhood_call_count \
                          FROM FireServiceView \
                          WHERE year(`CallDateTS`) = '2020' \
                          GROUP BY `Neighborhooods-AnalysisBoundaries` \
                          ORDER BY neighborhood_call_count DESC \
                          LIMIT 20 \
                          ")

sf_calls_sql.show(20,False)

+---------------------------------+-----------------------+
|Neighborhooods-AnalysisBoundaries|neighborhood_call_count|
+---------------------------------+-----------------------+
|Tenderloin                       |33276                  |
|South of Market                  |23887                  |
|Mission                          |19745                  |
|Bayview Hunters Point            |13112                  |
|Financial District/South Beach   |12600                  |
|Sunset/Parkside                  |8408                   |
|Western Addition                 |8328                   |
|Nob Hill                         |7104                   |
|Hayes Valley                     |5767                   |
|Castro/Upper Market              |5736                   |
|Outer Richmond                   |5681                   |
|Pacific Heights                  |4893                   |
|Marina                           |4822                   |
|West of Twin Peaks               |4753 

In [28]:
# Types of calls made from Tenderloin area during this year
sf_calls_sql = spark.sql("SELECT `CallType`, count(`CallType`) as call_type_count \
                          FROM FireServiceView \
                          WHERE year(`CallDateTS`) = '2020' \
                          AND `Neighborhooods-AnalysisBoundaries` = 'Tenderloin' \
                          GROUP BY `CallType` \
                          ORDER BY call_type_count DESC \
                          LIMIT 20 \
                          ")

sf_calls_sql.show(20,False)

+-------------------------------+---------------+
|CallType                       |call_type_count|
+-------------------------------+---------------+
|Medical Incident               |26715          |
|Alarms                         |3122           |
|Structure Fire                 |1700           |
|Citizen Assist / Service Call  |469            |
|Traffic Collision              |347            |
|Other                          |302            |
|Outside Fire                   |250            |
|Elevator / Escalator Rescue    |148            |
|Gas Leak (Natural and LP Gases)|81             |
|Smoke Investigation (Outside)  |36             |
|Vehicle Fire                   |33             |
|Train / Rail Incident          |16             |
|Fuel Spill                     |15             |
|Electrical Hazard              |12             |
|Odor (Strange / Unknown)       |10             |
|HazMat                         |10             |
|Explosion                      |9              |


In [29]:
# write results to a file.
# sf_calls_sql.write.parquet("/content/drive/My Drive/Colab Datasets/output/")

sf_calls_sql.write.csv("/content/drive/My Drive/Colab Datasets/output/sf_fire_dept_output.csv")

