In [1]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col, to_timestamp, weekofyear, month, year,  sum,avg,max, min
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, FloatType

In [2]:
spark = SparkSession.builder.appName("sf-calls-analysis-app").getOrCreate()

23/10/21 13:01:54 WARN Utils: Your hostname, codespaces-babcfd resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
23/10/21 13:01:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/21 13:01:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', FloatType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

In [11]:

fire_df = spark.read.option('delimiter', ';').csv('sf_fire_calls_dataset.csv', header=True, schema=fire_schema,)

In [12]:
fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: float (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-

In [13]:
fire_df.take(1)

[Row(CallNumber=20110016, UnitID='T13', IncidentNumber=2003235, CallType='Structure Fire', CallDate='01/11/2002', WatchDate='01/10/2002', CallFinalDisposition='Other', AvailableDtTm='01/11/2002 01:51', Address='2000 Block of CALIFORNIA ST', City='SF', Zipcode=94109.0, Battalion='B04', StationArea='38', Box='3362', OriginalPriority='3', Priority='3', FinalPriority=3, ALSUnit=False, CallTypeGroup=None, NumAlarms=1, UnitType='TRUCK', UnitSequenceInCallDispatch=2, FirePreventionDistrict='4', SupervisorDistrict='5', Neighborhood='Pacific Heights', Location='(37.7895840679362, -122.428071912459)', RowID='020110016-T13', Delay=2.950000047683716)]

In [14]:
fire_df = (fire_df
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
              .withColumn("OnWatchDate",   to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")
              .withColumn('Zipcode', fire_df['Zipcode'].cast(IntegerType()))
              
              ).cache()

In [15]:

fire_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False)

                                                                                

+-------------------+-------------------+-------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS|
+-------------------+-------------------+-------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|NULL         |
|2002-01-11 00:00:00|2002-01-10 00:00:00|NULL         |
|2002-01-11 00:00:00|2002-01-10 00:00:00|NULL         |
|2002-01-11 00:00:00|2002-01-10 00:00:00|NULL         |
|2002-01-11 00:00:00|2002-01-10 00:00:00|NULL         |
+-------------------+-------------------+-------------+
only showing top 5 rows



In [8]:
fire_df.cache()

23/10/21 10:29:51 WARN CacheManager: Asked to cache already cached data.


DataFrame[CallNumber: int, UnitID: string, IncidentNumber: int, CallType: string, CallFinalDisposition: string, Address: string, City: string, Zipcode: float, Battalion: string, StationArea: string, Box: string, OriginalPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: string, Neighborhood: string, Location: string, RowID: string, Delay: float, IncidentDate: timestamp, OnWatchDate: timestamp, AvailableDtTS: timestamp]

# What were all the different types of fire calls in 2018?

In [9]:
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().show(5, False)


+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
+-----------------------------------+
only showing top 5 rows



In [10]:
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().count()

30

# What months within the year 2018 saw the highest number of fire calls?

In [11]:
fire_df.select("IncidentDate", weekofyear('IncidentDate') , month('IncidentDate') ).show(5)

+-------------------+------------------------+-------------------+
|       IncidentDate|weekofyear(IncidentDate)|month(IncidentDate)|
+-------------------+------------------------+-------------------+
|2002-01-11 00:00:00|                       2|                  1|
|2002-01-11 00:00:00|                       2|                  1|
|2002-01-11 00:00:00|                       2|                  1|
|2002-01-11 00:00:00|                       2|                  1|
|2002-01-11 00:00:00|                       2|                  1|
+-------------------+------------------------+-------------------+
only showing top 5 rows



In [12]:
fire_df.filter(year('IncidentDate') == 2018).groupBy(month('IncidentDate')).count().orderBy('count', ascending=False).show()

+-------------------+-----+
|month(IncidentDate)|count|
+-------------------+-----+
|                  2|  919|
|                  3|  431|
|                  1|  402|
|                 11|  197|
|                  9|  131|
|                 10|   32|
+-------------------+-----+



In [13]:
fire_df.filter(year('IncidentDate') == 2018).groupBy(weekofyear('IncidentDate')).count().orderBy('count', ascending=False).show()

+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|                       5|  236|
|                       8|  232|
|                      10|  232|
|                       9|  228|
|                       7|  228|
|                       6|  225|
|                       4|  201|
|                      44|  165|
|                       3|  100|
|                      11|   70|
|                      45|   64|
|                      38|   51|
|                      36|   49|
|                      39|   31|
+------------------------+-----+



# Which neighborhood in San Francisco generated the most fire calls in 2018?

In [14]:
fire_df.select('city').distinct().show(5)

+----+
|city|
+----+
| OAK|
|  DC|
|  TI|
|  HP|
|  YB|
+----+
only showing top 5 rows



In [15]:
fire_df.filter(year('IncidentDate') == 2018 ).groupby('Neighborhood').count().orderBy('count', ascending=False).show(5, False)

+------------------------------+-----+
|Neighborhood                  |count|
+------------------------------+-----+
|Tenderloin                    |318  |
|South of Market               |211  |
|Mission                       |184  |
|Financial District/South Beach|151  |
|Bayview Hunters Point         |93   |
+------------------------------+-----+
only showing top 5 rows



# Which neighborhoods had the worst response times to fire calls in 2018?

In [16]:
fire_df.filter(year('IncidentDate') == 2018).select('Neighborhood', 'Delay').orderBy('Delay', ascending=False).show(5, False)

+------------------------------+---------+
|Neighborhood                  |Delay    |
+------------------------------+---------+
|Financial District/South Beach|406.63333|
|Potrero Hill                  |109.8    |
|South of Market               |64.683334|
|Financial District/South Beach|59.35    |
|Mission                       |54.666668|
+------------------------------+---------+
only showing top 5 rows



# Find out all response or delayed times greater than 5 mins?

In [17]:
fire_df.filter(col('Delay') > 5).select('Delay').show(5)

+-----+
|Delay|
+-----+
| 5.35|
| 6.25|
|  5.2|
|  5.6|
| 7.25|
+-----+
only showing top 5 rows



In [18]:
fire_df.select("Delay").where(col("Delay") > 5).show(5, False)

+-----+
|Delay|
+-----+
|5.35 |
|6.25 |
|5.2  |
|5.6  |
|7.25 |
+-----+
only showing top 5 rows



# What were the most common call types?

In [19]:
fire_df.select('CallType').groupBy('CallType').count().orderBy('count', ascending=False).show(5, False)

+-----------------------------+-----+
|CallType                     |count|
+-----------------------------+-----+
|Medical Incident             |64946|
|Structure Fire               |13177|
|Alarms                       |11039|
|Traffic Collision            |4200 |
|Citizen Assist / Service Call|1372 |
+-----------------------------+-----+
only showing top 5 rows



# What zip codes accounted for most common calls?

In [20]:
fire_df.select('Zipcode').where(col('Zipcode').isNotNull()).show()

+-------+
|Zipcode|
+-------+
|94109.0|
|94124.0|
|94102.0|
|94110.0|
|94109.0|
|94105.0|
|94112.0|
|94102.0|
|94115.0|
|94114.0|
|94110.0|
|94112.0|
|94109.0|
|94121.0|
|94110.0|
|94110.0|
|94110.0|
|94116.0|
|94118.0|
|94118.0|
+-------+
only showing top 20 rows



In [21]:
fire_df.select('Zipcode').distinct().show()

+-------+
|Zipcode|
+-------+
|94114.0|
|94110.0|
|94129.0|
|94108.0|
|94134.0|
|94112.0|
|94158.0|
|94124.0|
|94103.0|
|94105.0|
|94104.0|
|94107.0|
|94132.0|
|94117.0|
|94115.0|
|94131.0|
|94118.0|
|94111.0|
|94116.0|
|94122.0|
+-------+
only showing top 20 rows



In [22]:
fire_df.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallFinalDisposition',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'Delay',
 'IncidentDate',
 'OnWatchDate',
 'AvailableDtTS']