In [1]:
import findspark
findspark.init('/home/hduser/spark')
import pyspark

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
            .builder \
            .appName("FireDeptCallsAnalysis") \
            .getOrCreate()

In [3]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, BooleanType

In [4]:
mySchema = StructType([StructField('CallNumber', IntegerType(), True), #Removed the space between Call Number
                       StructField('UnitID', StringType(), True),
                       StructField('IncidentNumber', IntegerType(), True),
                       StructField('CallType', StringType(), True),
                       StructField('CallDate', StringType(), True),
                       StructField('WatchDate', StringType(), True),
                       StructField('ReceivedDtTm', StringType(), True),
                       StructField('EntryDtTm', StringType(), True),
                       StructField('DispatchDtTm', StringType(), True),
                       StructField('ResponseDtTm', StringType(), True),
                       StructField('OnSceneDtTm', StringType(), True),
                       StructField('TransportDtTm', StringType(), True),
                       StructField('HospitalDtTm', StringType(), True),
                       StructField('CallFinalDisposition', StringType(), True),
                       StructField('AvailableDtTm', StringType(), True),
                       StructField('Address', StringType(), True),
                       StructField('City', StringType(), True),
                       StructField('ZipcodeofIncident', IntegerType(), True),
                       StructField('Battalion', StringType(), True),
                       StructField('StationArea', StringType(), True),
                       StructField('Box', StringType(), True),
                       StructField('OriginalPriority', StringType(), True),
                       StructField('Priority', StringType(), True),
                       StructField('FinalPriority', IntegerType(), True),
                       StructField('ALSUnit', BooleanType(), True),
                       StructField('CallTypeGroup', StringType(), True),
                       StructField('NumberofAlarms', IntegerType(), True),
                       StructField('UnitType', StringType(), True),
                       StructField('Unitsequenceincalldispatch', IntegerType(), True),
                       StructField('FirePreventionDistrict', StringType(), True),
                       StructField('SupervisorDistrict', StringType(), True),
                       StructField('Neighborhooods_AnalysisBoundaries', StringType(), True), 
                       StructField('Location', StringType(), True),
                       StructField('RowID', StringType(), True),
                       StructField('SupervisorDistricts', StringType(), True),
                       StructField('FirePreventionDistricts', StringType(), True),
                       StructField('CurrentPoliceDistricts', StringType(), True),
                       StructField('Neighborhoods_AnalysisBoundaries', StringType(), True),
                       StructField('ZipCodes', IntegerType(), True),
                       StructField('Neighborhoods(old)', StringType(), True),                       
                       StructField('PoliceDistricts', StringType(), True),
                       StructField('CivicCenterHarmReductionProjectBoundary', StringType(), True),
                       StructField('HSOCZones', StringType(), True),
                       StructField('CentralMarket/TenderloinBoundaryPolygon', StringType(), True)
                      ])

In [5]:
df = spark.read.csv("/home/hduser/Fire_Department_Calls_for_Service.csv", header = True,schema = mySchema)

## Print the first five records in the DataFrame

In [6]:
df.show(5)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+-----------------+---------+-----------+----+----------------+--------+-------------+-------+--------------------+--------------+--------+--------------------------+----------------------+------------------+---------------------------------+--------------------+--------------+-------------------+-----------------------+----------------------+--------------------------------+--------+------------------+---------------+---------------------------------------+---------+---------------------------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|        ReceivedDtTm|           EntryDtTm|        DispatchDtTm|        ResponseDtTm|         OnSceneDtTm|       Transp

In [7]:
df.head()

Row(CallNumber=201560006, UnitID='86', IncidentNumber=20064818, CallType='Medical Incident', CallDate='06/04/2020', WatchDate='06/03/2020', ReceivedDtTm='06/04/2020 12:00:17 AM', EntryDtTm='06/04/2020 12:02:00 AM', DispatchDtTm='06/04/2020 12:02:09 AM', ResponseDtTm='06/04/2020 12:02:13 AM', OnSceneDtTm='06/04/2020 12:15:50 AM', TransportDtTm='06/04/2020 12:37:17 AM', HospitalDtTm='06/04/2020 12:48:15 AM', CallFinalDisposition='Code 2 Transport', AvailableDtTm='06/04/2020 01:22:59 AM', Address='1400 Block of HALIBUT CT', City='Treasure Isla', ZipcodeofIncident=94130, Battalion='B03', StationArea='48', Box='2931', OriginalPriority='3', Priority='2', FinalPriority=2, ALSUnit=True, CallTypeGroup='Non Life-threatening', NumberofAlarms=1, UnitType='MEDIC', Unitsequenceincalldispatch=3, FirePreventionDistrict='None', SupervisorDistrict='6', Neighborhooods_AnalysisBoundaries='Treasure Island', Location='(37.824721643268795, -122.37496639605757)', RowID='201560006-86', SupervisorDistricts='9',

In [8]:
df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- ReceivedDtTm: string (nullable = true)
 |-- EntryDtTm: string (nullable = true)
 |-- DispatchDtTm: string (nullable = true)
 |-- ResponseDtTm: string (nullable = true)
 |-- OnSceneDtTm: string (nullable = true)
 |-- TransportDtTm: string (nullable = true)
 |-- HospitalDtTm: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeofIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPr

## Print the column names in the dataframe. 

In [9]:
df.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallDate',
 'WatchDate',
 'ReceivedDtTm',
 'EntryDtTm',
 'DispatchDtTm',
 'ResponseDtTm',
 'OnSceneDtTm',
 'TransportDtTm',
 'HospitalDtTm',
 'CallFinalDisposition',
 'AvailableDtTm',
 'Address',
 'City',
 'ZipcodeofIncident',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumberofAlarms',
 'UnitType',
 'Unitsequenceincalldispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhooods_AnalysisBoundaries',
 'Location',
 'RowID',
 'SupervisorDistricts',
 'FirePreventionDistricts',
 'CurrentPoliceDistricts',
 'Neighborhoods_AnalysisBoundaries',
 'ZipCodes',
 'Neighborhoods(old)',
 'PoliceDistricts',
 'CivicCenterHarmReductionProjectBoundary',
 'HSOCZones',
 'CentralMarket/TenderloinBoundaryPolygon']

## Count the number of rows in the DataFrame 

In [10]:
df.count()

5324768

## How many different types of calls were made to the Fire Department? 

In [11]:
df.select('CallType').distinct().show(25, False)    # "False" is used to avoid truncation of the "CallType"

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Lightning Strike (Investigation)            |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Train / Rail Fire                           |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire

## How many incidents of each call type were there?

In [12]:
df.select('CallType').groupBy('CallType').count().orderBy("count", ascending = False).show(20, False)

# Here count() is working as a transformation, so show() is used as the action.

+-------------------------------+-------+
|CallType                       |count  |
+-------------------------------+-------+
|Medical Incident               |3479202|
|Structure Fire                 |667918 |
|Alarms                         |576825 |
|Traffic Collision              |217931 |
|Other                          |84226  |
|Citizen Assist / Service Call  |79592  |
|Outside Fire                   |64943  |
|Water Rescue                   |26390  |
|Vehicle Fire                   |24811  |
|Gas Leak (Natural and LP Gases)|21678  |
|Electrical Hazard              |15954  |
|Elevator / Escalator Rescue    |14352  |
|Odor (Strange / Unknown)       |12867  |
|Smoke Investigation (Outside)  |11986  |
|Fuel Spill                     |6067   |
|HazMat                         |4142   |
|Industrial Accidents           |2995   |
|Explosion                      |2769   |
|Aircraft Emergency             |1511   |
|Train / Rail Incident          |1418   |
+-------------------------------+-

In [13]:
# Alternative way
ab = df.select('CallType').groupBy('CallType').count()

In [14]:
ab.orderBy("count", ascending = False).show()

+--------------------+-------+
|            CallType|  count|
+--------------------+-------+
|    Medical Incident|3479202|
|      Structure Fire| 667918|
|              Alarms| 576825|
|   Traffic Collision| 217931|
|               Other|  84226|
|Citizen Assist / ...|  79592|
|        Outside Fire|  64943|
|        Water Rescue|  26390|
|        Vehicle Fire|  24811|
|Gas Leak (Natural...|  21678|
|   Electrical Hazard|  15954|
|Elevator / Escala...|  14352|
|Odor (Strange / U...|  12867|
|Smoke Investigati...|  11986|
|          Fuel Spill|   6067|
|              HazMat|   4142|
|Industrial Accidents|   2995|
|           Explosion|   2769|
|  Aircraft Emergency|   1511|
|Train / Rail Inci...|   1418|
+--------------------+-------+
only showing top 20 rows



## How many distinct years of fire service calls data are in the data file?

In [15]:
from pyspark.sql.functions import unix_timestamp

In [16]:
pattern1 = "MM/dd/yyyy"

pattern2 = "MM/dd/yyyy hh:mm:ss aa"

In [17]:
# Making a new dataframe with timestamps - dfTs
# Creating a new Time Stamp column with the date/time fields, changing them to the proper pattern, typecasting
# them to Unix_timestamp and finally dropping the original column in the DF.



dfTs = df\
    .withColumn("CallDateTS", unix_timestamp(df['CallDate'], pattern1).cast('timestamp')).drop('CallDate')\
    .withColumn("WatchDateTS", unix_timestamp(df['WatchDate'], pattern1).cast('timestamp')).drop('WatchDate')\
    .withColumn("ReceivedDtTmTS", unix_timestamp(df['ReceivedDtTm'], pattern2).cast('timestamp')).drop('ReceivedDtTm')\
    .withColumn("EntryDtTmTS", unix_timestamp(df['EntryDtTm'], pattern2).cast('timestamp')).drop('EntryDtTm')\
    .withColumn("DispatchDtTmTS", unix_timestamp(df['DispatchDtTm'], pattern2).cast('timestamp')).drop('DispatchDtTm')\
    .withColumn("ResponseDtTmTS", unix_timestamp(df['ResponseDtTm'], pattern2).cast('timestamp')).drop('ResponseDtTm')\
    .withColumn("OnSceneDtTmTS", unix_timestamp(df['OnSceneDtTm'], pattern2).cast('timestamp')).drop('OnSceneDtTm')\
    .withColumn("TransportDtTmTS", unix_timestamp(df['TransportDtTm'], pattern2).cast('timestamp')).drop('TransportDtTm')\
    .withColumn("HospitalDtTmTS", unix_timestamp(df['HospitalDtTm'], pattern2).cast('timestamp')).drop('HospitalDtTm')\
    .withColumn("AvailableDtTmTS", unix_timestamp(df['AvailableDtTm'], pattern2).cast('timestamp')).drop('AvailableDtTm')
    

In [18]:
dfTs.printSchema()

# The new timestamp columns appear below in the list of columns

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeofIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumberofAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- Unitsequenceincalldispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhooods_AnalysisBoundaries: string (nullable = tru

In [19]:
dfTs.show(2)

+----------+------+--------------+----------------+--------------------+--------------------+-------------+-----------------+---------+-----------+----+----------------+--------+-------------+-------+--------------------+--------------+--------+--------------------------+----------------------+------------------+---------------------------------+--------------------+-------------+-------------------+-----------------------+----------------------+--------------------------------+--------+------------------+---------------+---------------------------------------+---------+---------------------------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|        CallType|CallFinalDisposition|             Address|         City|ZipcodeofIncident|Battalion|StationArea| Box|OriginalPriority|Priority|Fina

In [20]:
from pyspark.sql.functions import year

dfTs.select(year('CallDateTS')).distinct().orderBy('year(CallDateTS)').show()

+----------------+
|year(CallDateTS)|
+----------------+
|            2000|
|            2001|
|            2002|
|            2003|
|            2004|
|            2005|
|            2006|
|            2007|
|            2008|
|            2009|
|            2010|
|            2011|
|            2012|
|            2013|
|            2014|
|            2015|
|            2016|
|            2017|
|            2018|
|            2019|
+----------------+
only showing top 20 rows



## Partitioning

In [21]:
dfTs.rdd.getNumPartitions()

16

In [22]:
dfTs.repartition(5).createOrReplaceTempView('dfTs_View')

In [23]:
spark.table('dfTs_View').count()

5324768

In [24]:
spark.sql("Select * from dfTs_View limit 2").show()

+----------+------+--------------+--------------------+--------------------+--------------------+-------------+-----------------+---------+-----------+----+----------------+--------+-------------+-------+--------------------+--------------+--------+--------------------------+----------------------+------------------+---------------------------------+--------------------+-------------+-------------------+-----------------------+----------------------+--------------------------------+--------+------------------+---------------+---------------------------------------+---------+---------------------------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+---------------+--------------+-------------------+
|CallNumber|UnitID|IncidentNumber|            CallType|CallFinalDisposition|             Address|         City|ZipcodeofIncident|Battalion|StationArea| Box|OriginalPriority|Priority|Final

In [25]:
spark.sql('Select CallNumber, CallType, CallFinalDisposition, CallTypeGroup, CallDateTs from dfTs_View')\
     .show(20, False)

+----------+----------------+--------------------+----------------------------+-------------------+
|CallNumber|CallType        |CallFinalDisposition|CallTypeGroup               |CallDateTs         |
+----------+----------------+--------------------+----------------------------+-------------------+
|193343144 |Structure Fire  |Fire                |Alarm                       |2019-11-30 00:00:00|
|193343144 |Structure Fire  |Fire                |Alarm                       |2019-11-30 00:00:00|
|200990046 |Alarms          |Fire                |Alarm                       |2020-04-08 00:00:00|
|201850029 |Alarms          |Fire                |Alarm                       |2020-07-03 00:00:00|
|200852473 |Medical Incident|Code 2 Transport    |Potentially Life-Threatening|2020-03-25 00:00:00|
|42970095  |Alarms          |Other               |null                        |2004-10-23 00:00:00|
|201611550 |Medical Incident|No Merit            |Non Life-threatening        |2020-06-09 00:00:00|


In [26]:
from pyspark.sql.functions import desc, count

dfTs.select('CallNumber', 'CallType', 'CallFinalDisposition', 'CallTypeGroup', 'CallDateTs')\
     .groupBy('CallType') \
     .count() \
     .orderBy(count('CallType').desc()) \
     .show(20, False)

+-------------------------------+-------+
|CallType                       |count  |
+-------------------------------+-------+
|Medical Incident               |3479202|
|Structure Fire                 |667918 |
|Alarms                         |576825 |
|Traffic Collision              |217931 |
|Other                          |84226  |
|Citizen Assist / Service Call  |79592  |
|Outside Fire                   |64943  |
|Water Rescue                   |26390  |
|Vehicle Fire                   |24811  |
|Gas Leak (Natural and LP Gases)|21678  |
|Electrical Hazard              |15954  |
|Elevator / Escalator Rescue    |14352  |
|Odor (Strange / Unknown)       |12867  |
|Smoke Investigation (Outside)  |11986  |
|Fuel Spill                     |6067   |
|HazMat                         |4142   |
|Industrial Accidents           |2995   |
|Explosion                      |2769   |
|Aircraft Emergency             |1511   |
|Train / Rail Incident          |1418   |
+-------------------------------+-

In [27]:
from pyspark.sql.functions import asc

dfTs.select('CallNumber', 'CallType', 'CallFinalDisposition', 'CallTypeGroup', 'CallDateTs')\
     .groupBy('CallFinalDisposition') \
     .count() \
     .orderBy(asc('count')) \
     .show(20, False)

+--------------------------+-------+
|CallFinalDisposition      |count  |
+--------------------------+-------+
|Multi-casualty Incident   |219    |
|CHP                       |857    |
|Duplicate                 |1280   |
|Gone on Arrival           |15429  |
|SFPD                      |25894  |
|Medical Examiner          |34780  |
|Unable to Locate          |46904  |
|Against Medical Advice    |48499  |
|Cancelled                 |48527  |
|Code 3 Transport          |121386 |
|No Merit                  |157409 |
|Patient Declined Transport|175623 |
|Fire                      |642219 |
|Code 2 Transport          |1477685|
|Other                     |2528057|
+--------------------------+-------+



In [28]:
dfTs.select('CallNumber', 'CallType', 'CallFinalDisposition', 'CallTypeGroup', 'CallDateTs')\
     .groupBy('CallTypeGroup') \
     .count() \
     .orderBy(desc('count')) \
     .show(20, False)

+----------------------------+-------+
|CallTypeGroup               |count  |
+----------------------------+-------+
|null                        |2808692|
|Potentially Life-Threatening|1217385|
|Non Life-threatening        |598651 |
|Alarm                       |598512 |
|Fire                        |101528 |
+----------------------------+-------+



In [29]:
spark.sql('Select CallNumber, CallType, CallFinalDisposition, CallTypeGroup, CallDateTs \
from dfTs_View where CallFinalDisposition = "Fire"')\
     .show(20, False)

+----------+---------------------------+--------------------+-------------+-------------------+
|CallNumber|CallType                   |CallFinalDisposition|CallTypeGroup|CallDateTs         |
+----------+---------------------------+--------------------+-------------+-------------------+
|193343144 |Structure Fire             |Fire                |Alarm        |2019-11-30 00:00:00|
|193343144 |Structure Fire             |Fire                |Alarm        |2019-11-30 00:00:00|
|200990046 |Alarms                     |Fire                |Alarm        |2020-04-08 00:00:00|
|201850029 |Alarms                     |Fire                |Alarm        |2020-07-03 00:00:00|
|200810084 |Structure Fire             |Fire                |Alarm        |2020-03-21 00:00:00|
|200810084 |Structure Fire             |Fire                |Alarm        |2020-03-21 00:00:00|
|192930502 |Alarms                     |Fire                |Alarm        |2019-10-20 00:00:00|
|200263149 |Alarms                     |

In [30]:
spark.sql("SELECT count(*) from dfTs_View").show()

+--------+
|count(1)|
+--------+
| 5324768|
+--------+



## Which neighbourhood in San Francisco generated the most calls in 2016?

In [31]:
spark.sql("Select Neighborhooods_AnalysisBoundaries, count(Neighborhooods_AnalysisBoundaries) AS No_Of_Neighborhoods from dfTs_View \
where year(CallDateTS) == 2016 \
group by Neighborhooods_AnalysisBoundaries \
order by No_Of_Neighborhoods desc")\
    .show()

+---------------------------------+-------------------+
|Neighborhooods_AnalysisBoundaries|No_Of_Neighborhoods|
+---------------------------------+-------------------+
|                       Tenderloin|              41815|
|                  South of Market|              31293|
|                          Mission|              26897|
|             Financial Distric...|              22538|
|             Bayview Hunters P...|              15383|
|                  Sunset/Parkside|              10789|
|                 Western Addition|              10691|
|                         Nob Hill|               9987|
|                   Outer Richmond|               7994|
|              Castro/Upper Market|               7647|
|                     Hayes Valley|               7172|
|                      North Beach|               7094|
|                        Chinatown|               6256|
|               West of Twin Peaks|               6156|
|                  Pacific Heights|             

## Derive the number of current partitions and set it to 5.

In [32]:
spark.conf.get('spark.sql.shuffle.partitions')

'200'

In [33]:
spark.conf.set('spark.sql.shuffle.partitions',5)

In [34]:
spark.conf.get('spark.sql.shuffle.partitions')

'5'

In [35]:
spark.sql("desc dfTs_View").show(40, False)

+---------------------------------------+---------+-------+
|col_name                               |data_type|comment|
+---------------------------------------+---------+-------+
|CallNumber                             |int      |null   |
|UnitID                                 |string   |null   |
|IncidentNumber                         |int      |null   |
|CallType                               |string   |null   |
|CallFinalDisposition                   |string   |null   |
|Address                                |string   |null   |
|City                                   |string   |null   |
|ZipcodeofIncident                      |int      |null   |
|Battalion                              |string   |null   |
|StationArea                            |string   |null   |
|Box                                    |string   |null   |
|OriginalPriority                       |string   |null   |
|Priority                               |string   |null   |
|FinalPriority                          

#### DataFrame Join - The "Fire_Incidents.csv" has to be joined with "Fire_Department_Calls_for_Service.csv"

## What was the primary non medical reason most people called the Fire department from Tenderloin in 2015?

In [36]:
FI_df = spark.read.csv("/home/hduser/Fire_Incidents.csv", header = True, inferSchema = True)

In [37]:
FI_df.printSchema()

root
 |-- Incident Number: integer (nullable = true)
 |-- Exposure Number: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- Incident Date: timestamp (nullable = true)
 |-- Call Number: integer (nullable = true)
 |-- Alarm DtTm: timestamp (nullable = true)
 |-- Arrival DtTm: timestamp (nullable = true)
 |-- Close DtTm: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Suppression Units: integer (nullable = true)
 |-- Suppression Personnel: integer (nullable = true)
 |-- EMS Units: integer (nullable = true)
 |-- EMS Personnel: integer (nullable = true)
 |-- Other Units: integer (nullable = true)
 |-- Other Personnel: integer (nullable = true)
 |-- First Unit On Scene: string (nullable = true)
 |-- Estimated Property Loss: integer (nullable = true)
 |-- Estim

In [38]:
FI_df.count()

550492

In [39]:
# Here the space in the Incident Number field is removed as it will be used to join the table.

FireIncidentDf = FI_df.withColumnRenamed('Incident Number', 'IncidentNumber')

In [40]:
FireIncidentDf.printSchema()

root
 |-- IncidentNumber: integer (nullable = true)
 |-- Exposure Number: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- Incident Date: timestamp (nullable = true)
 |-- Call Number: integer (nullable = true)
 |-- Alarm DtTm: timestamp (nullable = true)
 |-- Arrival DtTm: timestamp (nullable = true)
 |-- Close DtTm: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Suppression Units: integer (nullable = true)
 |-- Suppression Personnel: integer (nullable = true)
 |-- EMS Units: integer (nullable = true)
 |-- EMS Personnel: integer (nullable = true)
 |-- Other Units: integer (nullable = true)
 |-- Other Personnel: integer (nullable = true)
 |-- First Unit On Scene: string (nullable = true)
 |-- Estimated Property Loss: integer (nullable = true)
 |-- Estima

In [41]:
joinedDf = dfTs.join(FireIncidentDf, dfTs.IncidentNumber == FireIncidentDf.IncidentNumber)

In [42]:
joinedDf.show(3)

+----------+------+--------------+----------------+--------------------+------------------+----+-----------------+---------+-----------+----+----------------+--------+-------------+-------+-------------+--------------+--------+--------------------------+----------------------+------------------+---------------------------------+--------------------+-------------+-------------------+-----------------------+----------------------+--------------------------------+--------+------------------+---------------+---------------------------------------+---------+---------------------------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+-------------------+---------------+--------------+-------------------+--------------+---------------+--------+--------------------+-------------------+-----------+-------------------+-------------------+-------------------+----+-------+---------+------------+----+-----------------+---

In [43]:
joinedDf.count()

1550065

In [44]:
from pyspark.sql.functions import col

joinedDf.filter(year('CallDateTS') == '2015')\
        .filter(col('Neighborhooods_AnalysisBoundaries') == 'Tenderloin') \
        .count()

8673

In [45]:
## The primary non medical reason most people called the Fire department from Tenderloin in 2015.

from pyspark.sql.functions import desc


joinedDf.filter(year('CallDateTS') == '2015')\
        .filter(col('Neighborhooods_AnalysisBoundaries') == 'Tenderloin') \
        .groupBy('Primary Situation') \
        .count() \
        .orderBy(desc('count')) \
        .show(15, False)

+------------------------------------------------------+-----+
|Primary Situation                                     |count|
+------------------------------------------------------+-----+
|700 False alarm or false call, other                  |1395 |
|711 Municipal alarm system, malicious false alarm     |1011 |
|735 Alarm system sounded due to malfunction           |572  |
|743 Smoke detector activation, no fire - unintentional|572  |
|500 Service Call, other                               |490  |
|113 Cooking fire, confined to container               |489  |
|745 Alarm system activation, no fire - unintentional  |437  |
|733 Smoke detector activation due to malfunction      |405  |
|100 Fire, other                                       |202  |
|600 Good intent call, other                           |179  |
|323 Motor vehicle/pedestrian accident (MV Ped)        |167  |
|111 Building fire                                     |164  |
|322 Motor vehicle accident with injuries              

#### Most of the calls were "False alarm or false call" from Tenderloin in 2015.

## What do residents of Russian Hill call the fire department for?

In [46]:
joinedDf.filter(year('CallDateTS') == '2015')\
        .filter(col('Neighborhooods_AnalysisBoundaries') == 'Russian Hill') \
        .groupBy('Primary Situation') \
        .count() \
        .orderBy(desc('count')) \
        .show(20, False)

+------------------------------------------------------+-----+
|Primary Situation                                     |count|
+------------------------------------------------------+-----+
|500 Service Call, other                               |332  |
|700 False alarm or false call, other                  |318  |
|711 Municipal alarm system, malicious false alarm     |143  |
|111 Building fire                                     |104  |
|322 Motor vehicle accident with injuries              |93   |
|113 Cooking fire, confined to container               |79   |
|100 Fire, other                                       |61   |
|745 Alarm system activation, no fire - unintentional  |60   |
|600 Good intent call, other                           |57   |
|323 Motor vehicle/pedestrian accident (MV Ped)        |54   |
|743 Smoke detector activation, no fire - unintentional|48   |
|324 Motor vehicle accident with no injuries.          |48   |
|531 Smoke or odor removal                             

#### Most of the calls were "Service Call, other" from Russian Hill.

## What do residents of Pacific Heights call the fire department for in 2016?

In [47]:
joinedDf.filter(year('CallDateTS') == '2016')\
        .filter(col('Neighborhooods_AnalysisBoundaries') == 'Pacific Heights') \
        .groupBy('Primary Situation') \
        .count() \
        .orderBy(desc('count')) \
        .show(20, False)

+------------------------------------------------------+-----+
|Primary Situation                                     |count|
+------------------------------------------------------+-----+
|700 False alarm or false call, other                  |464  |
|743 Smoke detector activation, no fire - unintentional|238  |
|711 Municipal alarm system, malicious false alarm     |124  |
|745 Alarm system activation, no fire - unintentional  |122  |
|730 System malfunction, other                         |120  |
|322 Motor vehicle accident with injuries              |94   |
|113 Cooking fire, confined to container               |92   |
|100 Fire, other                                       |79   |
|733 Smoke detector activation due to malfunction      |76   |
|111 Building fire                                     |74   |
|735 Alarm system sounded due to malfunction           |73   |
|740 Unintentional transmission of alarm, other        |70   |
|500 Service Call, other                               

#### Most of the calls were "False alarm" from Pacific Heights in 2016.