In [0]:
""" Project Description: San Francisco Fire Calls.


Databricks Cloud Path: /databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv """



# Load the given data file and create a Spark Dataframe.

from pyspark.sql.functions import *

raw_fire_df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

""" Problems with the data set:
1. Column Names are not standardized.
2. Some Date fields are of string type."""

# Column Names standardization
renamed_fire_df = raw_fire_df \
.withColumnRenamed("Call Number", "CallNumber") \
.withColumnRenamed("Unit ID", "UnitID") \
.withColumnRenamed("Incident Number", "IncidentNumber") \
.withColumnRenamed("Call Date", "CallDate") \
.withColumnRenamed("Watch Date", "WatchDate") \
.withColumnRenamed("Call Final Disposition", "CallFinalDisposition") \
.withColumnRenamed("Available DtTm", "AvailableDtTm") \
.withColumnRenamed("Zipcode of Incident", "Zipcode") \
.withColumnRenamed("Station Area", "StationArea") \
.withColumnRenamed("Final Priority", "FinalPriority") \
.withColumnRenamed("ALS Unit", "ALSUnit") \
.withColumnRenamed("Call Type Group", "CallTypeGroup") \
.withColumnRenamed("Unit sequence in call dispatch", "Unitsequenceincalldispatch") \
.withColumnRenamed("Fire Prevention District", "FirePreventionDistrict") \
.withColumnRenamed("Supervisor District", "SupervisorDistrict")

# Converting Some Date fields from string to timestamp
# Rounding Delay Column
fire_df = renamed_fire_df \
    .withColumn("AvailableDtTm", to_timestamp("AvailableDtTm", "MM/dd/yyyy hh:mm:ss a")) \
    .withColumn("Delay", round("Delay", 2))








In [0]:
from pyspark.sql.functions import *

In [0]:
raw_fire_df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

In [0]:
raw_fire_df.show(10)

+-----------+-------+---------------+----------------+----------+----------+----------------------+--------------------+--------------------+----+-------------------+---------+------------+----+------------+--------+--------------+--------+---------------+---------+------------+------------------------------+------------------------+-------------------+---------------+--------------------+-------------+------------------+
|Call Number|Unit ID|Incident Number|        CallType| Call Date|Watch Date|Call Final Disposition|      Available DtTm|             Address|City|Zipcode of Incident|Battalion|Station Area| Box|OrigPriority|Priority|Final Priority|ALS Unit|Call Type Group|NumAlarms|    UnitType|Unit sequence in call dispatch|Fire Prevention District|Supervisor District|   Neighborhood|            Location|        RowID|             Delay|
+-----------+-------+---------------+----------------+----------+----------+----------------------+--------------------+--------------------+----+--

In [0]:
renamed_fire_df = raw_fire_df \
.withColumnRenamed("Call Number", "CallNumber") \
.withColumnRenamed("Unit ID", "UnitID") \
.withColumnRenamed("Incident Number", "IncidentNumber") \
.withColumnRenamed("Call Date", "CallDate") \
.withColumnRenamed("Watch Date", "WatchDate") \
.withColumnRenamed("Call Final Disposition", "CallFinalDisposition") \
.withColumnRenamed("Available DtTm", "AvailableDtTm") \
.withColumnRenamed("Zipcode of Incident", "Zipcode") \
.withColumnRenamed("Station Area", "StationArea") \
.withColumnRenamed("Final Priority", "FinalPriority") \
.withColumnRenamed("ALS Unit", "ALSUnit") \
.withColumnRenamed("Call Type Group", "CallTypeGroup") \
.withColumnRenamed("Unit sequence in call dispatch", "Unitsequenceincalldispatch") \
.withColumnRenamed("Fire Prevention District", "FirePreventionDistrict") \
.withColumnRenamed("Supervisor District", "SupervisorDistrict")

In [0]:
fire_df = renamed_fire_df \
    .withColumn("AvailableDtTm", to_timestamp("AvailableDtTm", "MM/dd/yyyy hh:mm:ss a")) \
    .withColumn("Delay", round("Delay", 2))

In [0]:
# Q-1) How many distinct types of calls were made to the Fire Department?

""" 1. SQL Approach
- Convert your dataframe to a temporary view
- Run your SQL on the view """

fire_df.createOrReplaceTempView("fire_service_calls_view")
q1_sql_df = spark.sql("""
                      select count(distinct CallType) as no_of_distinct_calls 
                      from fire_service_calls_view 
                      """)
q1_sql_df.show()

# Dataframe Transformation Approach

q1_df = fire_df.where("CallType is not null") \
        .select("CallType") \
        .distinct()
display(q1_df.count())

+--------------------+
|no_of_distinct_calls|
+--------------------+
|                  32|
+--------------------+

32

In [0]:
# Q-2) What are distinct types of calls were made to the Fire Department?

q2_df = fire_df.where("CallType is not null") \
        .select("CallType") \
        .distinct()
q2_df.show()

+--------------------+
|            CallType|
+--------------------+
|Elevator / Escala...|
|  Aircraft Emergency|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|              HazMat|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|               Other|
|        Outside Fire|
|       Assist Police|
|Gas Leak (Natural...|
|        Water Rescue|
|   Electrical Hazard|
|      Structure Fire|
|Industrial Accidents|
|    Medical Incident|
|          Fuel Spill|
|Smoke Investigati...|
+--------------------+
only showing top 20 rows



In [0]:

# Q-3) Find out all response for delayed times greater than 5 mins?

q3_df = fire_df.where("Delay > 5") \
        .select("CallNumber", "Delay")
q3_df.show()



+----------+-----+
|CallNumber|Delay|
+----------+-----+
|  20110014| 5.23|
|  20110017| 6.93|
|  20110019| 6.12|
|  20110039| 7.85|
|  20110045|77.33|
|  20110046| 5.42|
|  20110055|  6.5|
|  20110058| 6.85|
|  20110058| 6.85|
|  20110061| 6.33|
|  20110062| 7.02|
|  20110066|  9.1|
|  20110068| 5.98|
|  20110070| 5.42|
|  20110070| 5.08|
|  20110070| 8.18|
|  20110070| 5.98|
|  20110077|18.07|
|  20110103| 5.55|
|  20110103| 5.97|
+----------+-----+
only showing top 20 rows



In [0]:

# Q-4) What were the most common call types?

q4_df = fire_df.select("CallType") \
        .groupBy("CallType") \
        .count() \
        .orderBy("count", ascending=False)
q4_df.show(1)




+----------------+-------+
|        CallType|  count|
+----------------+-------+
|Medical Incident|2843475|
+----------------+-------+
only showing top 1 row



In [0]:
# Q-5) What zip codes accounted for most common calls?

fire_df.select("CallType", "Zipcode") \
    .groupBy("CallType", "Zipcode") \
    .count() \
    .orderBy("count", ascending=False) \
    .show()

+----------------+-------+------+
|        CallType|Zipcode| count|
+----------------+-------+------+
|Medical Incident|  94102|401457|
|Medical Incident|  94103|370215|
|Medical Incident|  94110|249279|
|Medical Incident|  94109|238087|
|Medical Incident|  94124|147564|
|Medical Incident|  94112|139565|
|Medical Incident|  94115|120087|
|Medical Incident|  94122|107602|
|Medical Incident|  94107|107439|
|Medical Incident|  94133| 99050|
|Medical Incident|  94117| 92744|
|Medical Incident|  94134| 83569|
|Medical Incident|  94114| 82378|
|Medical Incident|  94118| 77817|
|Medical Incident|  94121| 74943|
|Medical Incident|  94116| 66742|
|Medical Incident|  94132| 64439|
|  Structure Fire|  94110| 57014|
|Medical Incident|  94105| 56909|
|  Structure Fire|  94103| 55529|
+----------------+-------+------+
only showing top 20 rows



In [0]:
# Q-6) What San Francisco neighborhoods are in the zip codes 94102 and 94103

fire_df.select("Neighborhood", "Zipcode") \
    .where("Zipcode in (94102, 94103)") \
    .distinct() \
    .show()

+--------------------+-------+
|        Neighborhood|Zipcode|
+--------------------+-------+
|        Potrero Hill|  94103|
|    Western Addition|  94102|
|          Tenderloin|  94102|
|            Nob Hill|  94102|
| Castro/Upper Market|  94103|
|             Mission|  94102|
|     South of Market|  94102|
|     South of Market|  94103|
|        Hayes Valley|  94103|
|Financial Distric...|  94102|
|         Mission Bay|  94103|
|          Tenderloin|  94103|
|Financial Distric...|  94103|
|        Hayes Valley|  94102|
|             Mission|  94103|
+--------------------+-------+



In [0]:

# Q-7) What was the sum of all call alarms, average, min and max of the response times for calls?

fire_df.select(
    sum("NumAlarms").alias("total_call_alarms"),
    avg("Delay").alias("avg_call_response_time"),
    min("Delay").alias("min_call_response_time"),
    max("Delay").alias("max_call_response_time"),
).show()




+-----------------+----------------------+----------------------+----------------------+
|total_call_alarms|avg_call_response_time|min_call_response_time|max_call_response_time|
+-----------------+----------------------+----------------------+----------------------+
|          4403441|    3.9021698123113886|                  0.02|               1879.62|
+-----------------+----------------------+----------------------+----------------------+



In [0]:
# Q-8) How many distinct years of data is in the dataset?

fire_df.select(year("CallDate").alias("call_year")) \
    .distinct() \
    .orderBy("call_year") \
    .show()

+---------+
|call_year|
+---------+
|     2000|
|     2001|
|     2002|
|     2003|
|     2004|
|     2005|
|     2006|
|     2007|
|     2008|
|     2009|
|     2010|
|     2011|
|     2012|
|     2013|
|     2014|
|     2015|
|     2016|
|     2017|
|     2018|
+---------+



In [0]:
# Q-9) What week of the year in 2018 had the most fire calls?

fire_df.withColumn('week_of_year',weekofyear("CallDate")).select("week_of_year") \
    .filter(year("CallDate") == 2018) \
    .groupBy("week_of_year") \
    .count() \
    .orderBy("count", ascending=False) \
    .show(1)


+------------+-----+
|week_of_year|count|
+------------+-----+
|           1| 6401|
+------------+-----+
only showing top 1 row



In [0]:

# Q-10) What neighborhoods in San Francisco had the worst response time in 2018?

fire_df.select("Neighborhood", "Delay") \
    .filter(year("CallDate") == 2018) \
    .orderBy("Delay", ascending=False) \
    .show(1)

+------------------+------+
|      Neighborhood| Delay|
+------------------+------+
|West of Twin Peaks|754.08|
+------------------+------+
only showing top 1 row

