In [None]:

from pyspark.sql.types import *
from pyspark.sql.functions import *


In [35]:
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

fire_df = spark.read.csv("s3://workshop-datasets-2022/Fire_Department_Calls_for_Service.csv", header=True, schema=fire_schema)
#fire_df = spark.read.csv("s3://workshop-datasets-2022/Fire_Department_Calls_For_Service__2016_.csv", header=True)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
fire_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

523928

In [37]:
fire_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [None]:
fire_df.limit(50)

In [38]:
few_fire_df = (fire_df.select("IncidentNumber", "AvailableDtTm", "CallType") 
              .where(col("CallType") != "Medical Incident"))


few_fire_df.show(5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----------------------+-----------------+
|IncidentNumber|AvailableDtTm         |CallType         |
+--------------+----------------------+-----------------+
|21000746      |01/02/2021 07:49:55 PM|Alarms           |
|21000454      |01/02/2021 03:03:06 AM|Other            |
|21000466      |01/02/2021 04:53:25 AM|Outside Fire     |
|21000443      |01/02/2021 02:02:18 AM|Alarms           |
|21000631      |01/02/2021 02:39:15 PM|Traffic Collision|
+--------------+----------------------+-----------------+
only showing top 5 rows

**Q-1) How many distinct types of calls were made to the Fire Department?**

To be sure, let's not count "null" strings in that column.


In [39]:
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

30

**Q-2) What are distinct types of calls were made to the Fire Department?**

These are all the distinct type of call to the SF Fire Department


In [40]:
fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().show(10, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Odor (Strange / Unknown)                    |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Traffic Collision                           |
|Water Rescue                                |
|Structure Fire                              |
|Aircraft Emergency                          |
+--------------------------------------------+
only showing top 10 rows

## Let's do some ETL:

    Transform the string dates to Spark Timestamp data type so we can make some time-based queries later
    Returns a transformed query
    Cache the new DataFrame



In [43]:
fire_ts_df = (new_fire_df
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate") 
              .withColumn("OnWatchDate",   to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm"))          

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
fire_ts_df.cache()
fire_ts_df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['CallNumber', 'UnitID', 'IncidentNumber', 'CallType', 'CallFinalDisposition', 'Address', 'City', 'Zipcode', 'Battalion', 'StationArea', 'Box', 'OriginalPriority', 'Priority', 'FinalPriority', 'ALSUnit', 'CallTypeGroup', 'NumAlarms', 'UnitType', 'UnitSequenceInCallDispatch', 'FirePreventionDistrict', 'SupervisorDistrict', 'Neighborhood', 'Location', 'RowID', 'ResponseDelayedinMins', 'IncidentDate', 'OnWatchDate', 'AvailableDtTS']

Check the transformed columns with Spark Timestamp type

In [45]:
fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2021-01-02 00:00:00|2021-01-02 00:00:00|2021-01-02 19:49:55|
|2021-01-02 00:00:00|2021-01-02 00:00:00|2021-01-02 18:03:53|
|2021-01-02 00:00:00|2021-01-02 00:00:00|2021-01-02 16:29:46|
|2021-01-02 00:00:00|2021-01-01 00:00:00|2021-01-02 03:03:06|
|2021-01-02 00:00:00|2021-01-02 00:00:00|2021-01-02 14:15:14|
+-------------------+-------------------+-------------------+
only showing top 5 rows

**Q-4) What were the most common call types?**

List them in descending order


In [46]:
(fire_ts_df
 .select("CallType").where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |359574|
|Alarms                         |65952 |
|Structure Fire                 |32666 |
|Traffic Collision              |20108 |
|Other                          |12045 |
|Outside Fire                   |9309  |
|Citizen Assist / Service Call  |7455  |
|Water Rescue                   |3819  |
|Gas Leak (Natural and LP Gases)|3792  |
|Electrical Hazard              |2752  |
+-------------------------------+------+
only showing top 10 rows

In [47]:
(fire_ts_df
 .select("CallType").where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+
|            CallType| count|
+--------------------+------+
|    Medical Incident|359574|
|              Alarms| 65952|
|      Structure Fire| 32666|
|   Traffic Collision| 20108|
|               Other| 12045|
|        Outside Fire|  9309|
|Citizen Assist / ...|  7455|
|        Water Rescue|  3819|
|Gas Leak (Natural...|  3792|
|   Electrical Hazard|  2752|
|        Vehicle Fire|  1720|
|Elevator / Escala...|  1680|
|Smoke Investigati...|  1184|
|          Fuel Spill|   464|
|Odor (Strange / U...|   402|
|Industrial Accidents|   148|
|           Explosion|   140|
|Train / Rail Inci...|   123|
|   High Angle Rescue|   112|
|              HazMat|   106|
+--------------------+------+
only showing top 20 rows

**Q-4a) What zip codes accounted for most common calls?**
Let's investigate what zip codes in San Francisco accounted for most fire calls and what type where they.

    Filter out by CallType
    Group them by CallType and Zip code
    Count them and display them in descending order

It seems like the most common calls were all related to Medical Incident, and the two zip codes are 94102 and 94103.


In [53]:
(fire_ts_df
 .select("CallType", "ZipCode")
 .where(col("CallType").isNotNull())
 .groupBy("CallType", "Zipcode")
 .count()
 .orderBy("count", ascending=False)
 .show())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+------+
|            CallType|Zipcode| count|
+--------------------+-------+------+
|    Medical Incident|   null|359574|
|              Alarms|   null| 65952|
|      Structure Fire|   null| 32666|
|   Traffic Collision|   null| 20108|
|               Other|   null| 12045|
|        Outside Fire|   null|  9309|
|Citizen Assist / ...|   null|  7455|
|        Water Rescue|   null|  3819|
|Gas Leak (Natural...|   null|  3792|
|   Electrical Hazard|   null|  2752|
|        Vehicle Fire|   null|  1720|
|Elevator / Escala...|   null|  1680|
|Smoke Investigati...|   null|  1184|
|          Fuel Spill|   null|   464|
|Odor (Strange / U...|   null|   402|
|Industrial Accidents|   null|   148|
|           Explosion|   null|   140|
|Train / Rail Inci...|   null|   123|
|   High Angle Rescue|   null|   112|
|              HazMat|   null|   106|
+--------------------+-------+------+
only showing top 20 rows

**Q-4b) What San Francisco neighborhoods are in the zip codes 94102 and 94103**

Let's find out the neighborhoods associated with these two zip codes. In all likelihood, these are some of the contested neighborhood with high reported crimes.


In [54]:
(fire_ts_df
 .select("Neighborhood", "Zipcode")
 .where((col("Zipcode") == 94102) | (col("Zipcode") == 94103))
 .distinct()
 .show())



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-------+
|Neighborhood|Zipcode|
+------------+-------+
+------------+-------+