# San Francisco Fire Calls Analysis


- How many distinct types of calls were made to the Fire Department?

- What are distinct types of calls were made to the Fire Department?

- Find out all response or delayed times greater than 5 mins?

- What were the most common call types?

- What zip codes accounted for most common calls?

- What San Francisco neighborhoods are in the zip codes 94102 and 94103

- What was the sum of all calls, average, min and max of the response times for calls?

- How many distinct years of data is in the CSV file?

- What week of the year in 2018 had the most fire calls?

- What neighborhoods in San Francisco had the worst response time in 2018?

- How can we use Parquet files to store data and read it back?


In [2]:
%%bash

hadoop fs -ls /user/ranga_rao/learningSparkBook/sf-fire-calls.csv

-rw-r--r--   2 ranga_rao hdfs   44530123 2020-09-11 16:46 /user/ranga_rao/learningSparkBook/sf-fire-calls.csv


In [3]:
%%bash

hadoop fs -cat /user/ranga_rao/learningSparkBook/sf-fire-calls.csv | head

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay
20110016,T13,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:51:44 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,false,"",1,TRUCK,2,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-T13,2.95
20110022,M17,2003241,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 03:01:18 AM,0 Block of SILVERVIEW DR,SF,94124,B10,42,6495,3,3,3,true,"",1,MEDIC,1,10,10,Bayview Hunters Point,"(37.7337623673897, -122.396113802632)",020110022-M17,4.7
20110023,M41,2003242,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 02:39:50 AM,MARKET ST/MCALLISTER ST,SF,94102,B03,01,1455,3,3,3,true,"",1,MEDIC,2,3,6,Tenderloin,"(37.7811772186

cat: Unable to write to output stream.


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [5]:
#spark.stop()

spark = (SparkSession
         .builder
         .config('spark.ui.port', 0)
         .appName('SF_FireCallsAnalysis')
         .master('yarn')
         .getOrCreate()
)

In [6]:
spark

In [7]:
#Cols: 
#,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,
#FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay

#20110016,T13,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:51:44 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,false,"",1,
#TRUCK,2,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-T13,2.95

schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

In [8]:
fireCalls_df = (spark
                .read
                .option('header', 'true')
                .schema(schema)
                .csv('/user/ranga_rao/learningSparkBook/sf-fire-calls.csv')
)

fireCalls_df.show()

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

In [9]:
fireCalls_df.count()

175296

In [10]:
fireCalls_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

## How many distinct types of calls were made to the Fire Department?

In [11]:
(fireCalls_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .distinct()
 .count()
)

30

## What are distinct types of calls were made to the Fire Department?

In [28]:
(fireCalls_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .distinct()
).show(30, False)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

## Find out all response or delayed times greater than 5 mins?

In [12]:
(fireCalls_df
 .select("Delay")
 .filter(col("Delay") > 5)
).withColumnRenamed("Delayed", "Response Delayed Time").show()

+---------+
|    Delay|
+---------+
|     5.35|
|     6.25|
|      5.2|
|      5.6|
|     7.25|
|11.916667|
| 5.116667|
| 8.633333|
| 95.28333|
|     5.45|
|      7.6|
| 6.133333|
|5.1833334|
|6.9166665|
|      5.2|
|     6.35|
| 7.983333|
|    13.55|
|     5.15|
|13.583333|
+---------+
only showing top 20 rows



- ### Transform the string dates to Spark Timestamp data type so we can make some time-based queries later
- ### Returns a transformed query
- ### Cache the new DataFrame

In [13]:
#CallDate, WatchDate, AvailableDtTm

fireCalls_df = (fireCalls_df
                .withColumn("CallDate_ts", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
                .withColumn("WatchDate_ts", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
                .withColumn("AvailableDtTm_ts", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
                .drop("CallDate", "WatchDate", "AvailableDtTm")
               )

In [14]:
(fireCalls_df
 .select("CallDate_ts", "WatchDate_ts", "AvailableDtTm_ts")
 .show(5, False))

+-------------------+-------------------+-------------------+
|CallDate_ts        |WatchDate_ts       |AvailableDtTm_ts   |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [15]:
fireCalls_df.cache()

DataFrame[CallNumber: int, UnitID: string, IncidentNumber: int, CallType: string, CallFinalDisposition: string, Address: string, City: string, Zipcode: int, Battalion: string, StationArea: string, Box: string, OriginalPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: string, Neighborhood: string, Location: string, RowID: string, Delay: float, CallDate_ts: timestamp, WatchDate_ts: timestamp, AvailableDtTm_ts: timestamp]

In [16]:
fireCalls_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- SupervisorDistrict: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Location: string (nullable =

## What were the most common call types?

In [17]:
(fireCalls_df
 .select("CallType")
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending = False)
).show(30, False)

+--------------------------------------------+------+
|CallType                                    |count |
+--------------------------------------------+------+
|Medical Incident                            |113794|
|Structure Fire                              |23319 |
|Alarms                                      |19406 |
|Traffic Collision                           |7013  |
|Citizen Assist / Service Call               |2524  |
|Other                                       |2166  |
|Outside Fire                                |2094  |
|Vehicle Fire                                |854   |
|Gas Leak (Natural and LP Gases)             |764   |
|Water Rescue                                |755   |
|Odor (Strange / Unknown)                    |490   |
|Electrical Hazard                           |482   |
|Elevator / Escalator Rescue                 |453   |
|Smoke Investigation (Outside)               |391   |
|Fuel Spill                                  |193   |
|HazMat                     

## What zip codes accounted for most common calls?

In [18]:
(fireCalls_df
 .select("CallType", "Zipcode")
 .filter(col("CallType").isNotNull())
 .groupBy("CallType", "Zipcode")
 .count()
 .orderBy("count", ascending = False)
).show(50, False)

+-----------------+-------+-----+
|CallType         |Zipcode|count|
+-----------------+-------+-----+
|Medical Incident |94102  |16130|
|Medical Incident |94103  |14775|
|Medical Incident |94110  |9995 |
|Medical Incident |94109  |9479 |
|Medical Incident |94124  |5885 |
|Medical Incident |94112  |5630 |
|Medical Incident |94115  |4785 |
|Medical Incident |94122  |4323 |
|Medical Incident |94107  |4284 |
|Medical Incident |94133  |3977 |
|Medical Incident |94117  |3522 |
|Medical Incident |94134  |3437 |
|Medical Incident |94114  |3225 |
|Medical Incident |94118  |3104 |
|Medical Incident |94121  |2953 |
|Medical Incident |94116  |2738 |
|Medical Incident |94132  |2594 |
|Structure Fire   |94110  |2267 |
|Medical Incident |94105  |2258 |
|Structure Fire   |94102  |2229 |
|Structure Fire   |94103  |2221 |
|Alarms           |94103  |2168 |
|Medical Incident |94108  |2162 |
|Structure Fire   |94109  |2160 |
|Alarms           |94102  |2140 |
|Medical Incident |94123  |1940 |
|Medical Incid

## What San Francisco neighborhoods are in the zip codes 94102 and 94103

In [19]:
(fireCalls_df
 .select('Zipcode', 'Neighborhood')
 .filter(col("Zipcode").isin([94102, 94103]))
 .distinct()
 .orderBy("Zipcode")
).show(50, False)

+-------+------------------------------+
|Zipcode|Neighborhood                  |
+-------+------------------------------+
|94102  |Hayes Valley                  |
|94102  |Western Addition              |
|94102  |Tenderloin                    |
|94102  |Nob Hill                      |
|94102  |South of Market               |
|94102  |Financial District/South Beach|
|94103  |Financial District/South Beach|
|94103  |Hayes Valley                  |
|94103  |Mission                       |
|94103  |Tenderloin                    |
|94103  |South of Market               |
|94103  |Mission Bay                   |
|94103  |Potrero Hill                  |
|94103  |Castro/Upper Market           |
+-------+------------------------------+



## What was the sum of all calls, average, min and max of the response times for calls?

In [21]:
(fireCalls_df
 .select(sum('NumAlarms'), avg("Delay"), min("Delay"), max("Delay"))
).show()

+--------------+-----------------+-----------+----------+
|sum(NumAlarms)|       avg(Delay)| min(Delay)|max(Delay)|
+--------------+-----------------+-----------+----------+
|        176170|3.892364154521585|0.016666668|   1844.55|
+--------------+-----------------+-----------+----------+



## How many distinct years of data is in the CSV file?

In [23]:
(fireCalls_df
 .select(year('CallDate_ts'))
 .distinct()
 .count()
)

19

## What week of the year in 2018 had the most fire calls?

In [29]:
(fireCalls_df
 .filter(year('CallDate_ts') == 2018)
 .groupBy(weekofyear('CallDate_ts'))
 .count()
 .orderBy('count', ascending = False)
 .select(col('weekofyear(CallDate_ts)').alias('WweekNumber'), 'count')
).show(52)

+-----------+-----+
|WweekNumber|count|
+-----------+-----+
|         22|  259|
|         40|  255|
|         43|  250|
|         25|  249|
|          1|  246|
|         44|  244|
|         32|  243|
|         13|  243|
|         11|  240|
|          5|  236|
|         18|  236|
|         23|  235|
|         31|  234|
|         42|  234|
|          2|  234|
|         19|  233|
|          8|  232|
|         34|  232|
|         10|  232|
|         21|  231|
|         28|  231|
|          7|  228|
|          9|  228|
|         16|  228|
|         38|  226|
|         20|  225|
|         33|  225|
|         14|  225|
|          6|  225|
|         39|  224|
|          3|  224|
|         27|  223|
|         26|  223|
|         29|  223|
|         37|  223|
|         15|  222|
|         35|  221|
|         12|  221|
|         41|  220|
|         30|  203|
|         17|  203|
|         36|  203|
|          4|  202|
|         24|  198|
|         45|   64|
+-----------+-----+



## What neighborhoods in San Francisco had the worst response time in 2018?

In [37]:
(fireCalls_df
 .select('Neighborhood','Delay')
 .filter(year('CallDate_ts') == 2018)
 .orderBy('Delay', ascending = False)
).show(10, False)

+------------------------------+---------+
|Neighborhood                  |Delay    |
+------------------------------+---------+
|Chinatown                     |491.26666|
|Financial District/South Beach|406.63333|
|Tenderloin                    |340.48334|
|Haight Ashbury                |175.86667|
|Bayview Hunters Point         |155.8    |
|Financial District/South Beach|135.51666|
|Pacific Heights               |129.01666|
|Potrero Hill                  |109.8    |
|Inner Sunset                  |106.13333|
|South of Market               |94.71667 |
+------------------------------+---------+
only showing top 10 rows



## How can we use Parquet files to store data and read it back?

In [38]:
(fireCalls_df
 .write
 .mode("overwrite")
 .parquet("/user/ranga_rao/learningSparkBook/Solutions/")
)

In [41]:
(spark
 .read
 .parquet("/user/ranga_rao/learningSparkBook/Solutions/")
).show(10, False)

+----------+------+--------------+----------------+--------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+------------------------------+-------------------------------------+-------------+---------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|CallType        |CallFinalDisposition|Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood                  |Location                             |RowID        |Delay    |CallDate_ts        |WatchDate_ts       |AvailableDtTm_ts   |
+----------+------+--------------+----------------+--------------------+---------------------------+----+-------