In [1]:
from pyspark.sql.functions import *

In [2]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("FireCalls") \
    .getOrCreate()

raw_fire_df = spark.read.format("csv") \
            .option("header" , "true") \
            .option("inferSchema" , "true")\
            .load("datasets/sf-fire-calls.csv") 

In [3]:
raw_fire_df.printSchema()
raw_fire_df.show()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [4]:
renamed_fire_df = raw_fire_df \
                    .withColumnRenamed('CallNumber', 'CallNo')

In [5]:
fire_df = renamed_fire_df   \
                .withColumn('CallDate' , to_date('CallDate' , 'MM/dd/yyyy')) \
                .withColumn('WatchDate' , to_date('WatchDate' , 'MM/dd/yyyy')) \
                .withColumn('AvailableDtTm' , to_timestamp('AvailableDtTm' , 'MM/dd/yyyy hh:mm:ss a')) \
                .withColumn('Delay' , round("Delay" , 2))
                

In [6]:
fire_df.printSchema()

root
 |-- CallNo: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: date (nullable = true)
 |-- WatchDate: date (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: timestamp (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- S

In [7]:
fire_df.cache()
# help to clean up memory and execute it faster

DataFrame[CallNo: int, UnitID: string, IncidentNumber: int, CallType: string, CallDate: date, WatchDate: date, CallFinalDisposition: string, AvailableDtTm: timestamp, Address: string, City: string, Zipcode: int, Battalion: string, StationArea: string, Box: string, OriginalPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: string, Neighborhood: string, Location: string, RowID: string, Delay: double]

In [8]:
# Q1 how many distinct types of calls were made to the fire department ?
# Using SQL
fire_df.createOrReplaceTempView("fire_service_calls_view")
q1_sql_df = spark.sql("""
                       select count(distinct CallType) as distinct_calltype_count
                       from fire_service_calls_view
                       where CallType is not null 
                      """)
q1_sql_df.show()

# Using Dataframe
q1_df = fire_df.where("CallType is not null")\
    .select("CallType")\
    .distinct()
print(q1_df.count())

+-----------------------+
|distinct_calltype_count|
+-----------------------+
|                     30|
+-----------------------+

30


In [9]:
# Q2  select distinct types of calls were made to the fire department ? 
q1_df = fire_df.where("CallType is not null")\
    .select(expr("CallType as distinct_calltype" ))\
    .distinct()\
    .show()
    

+--------------------+
|   distinct_calltype|
+--------------------+
|Elevator / Escala...|
|  Aircraft Emergency|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|              HazMat|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|Extrication / Ent...|
|               Other|
|        Outside Fire|
|   Traffic Collision|
|       Assist Police|
|Gas Leak (Natural...|
|        Water Rescue|
|   Electrical Hazard|
|   High Angle Rescue|
|      Structure Fire|
+--------------------+
only showing top 20 rows


In [10]:
# Q3. Find all response for delayed times greate then 5
q3_df = fire_df.where("Delay > 5")\
                .select("CallNo" , "Delay")\
                .show()
                

+--------+-----+
|  CallNo|Delay|
+--------+-----+
|20110315| 5.35|
|20120147| 6.25|
|20130013|  5.2|
|20140067|  5.6|
|20140177| 7.25|
|20150056|11.92|
|20150254| 5.12|
|20150265| 8.63|
|20150265|95.28|
|20150380| 5.45|
|20150414|  7.6|
|20160059| 6.13|
|20160064| 5.18|
|20170118| 6.92|
|20170342|  5.2|
|20180129| 6.35|
|20180191| 7.98|
|20180382|13.55|
|20190062| 5.15|
|20190097|13.58|
+--------+-----+
only showing top 20 rows


In [11]:
# Q4 what were the most common call type

q4_df = fire_df.select("CallType")\
    .where("CallType is not null")\
        .groupBy("CallType")\
            .count()\
                .orderBy("count" , ascending=False)\
                .show()


+--------------------+------+
|            CallType| count|
+--------------------+------+
|    Medical Incident|113794|
|      Structure Fire| 23319|
|              Alarms| 19406|
|   Traffic Collision|  7013|
|Citizen Assist / ...|  2524|
|               Other|  2166|
|        Outside Fire|  2094|
|        Vehicle Fire|   854|
|Gas Leak (Natural...|   764|
|        Water Rescue|   755|
|Odor (Strange / U...|   490|
|   Electrical Hazard|   482|
|Elevator / Escala...|   453|
|Smoke Investigati...|   391|
|          Fuel Spill|   193|
|              HazMat|   124|
|Industrial Accidents|    94|
|           Explosion|    89|
|Train / Rail Inci...|    57|
|  Aircraft Emergency|    36|
+--------------------+------+
only showing top 20 rows


In [12]:
# Q5 What zipcodes accounted for most common calls

q5_df = fire_df.where("CallType is not null")\
    .select("CallType" , "Zipcode")\
        .groupBy("CallType" , "Zipcode")\
            .count()\
                .orderBy('count' , asscending = False)\
                    .show()
                


+--------------------+-------+-----+
|            CallType|Zipcode|count|
+--------------------+-------+-----+
|           Oil Spill|  94112|    1|
|              HazMat|  94127|    1|
|           Explosion|  94104|    1|
|           Oil Spill|  94124|    1|
|       Assist Police|  94103|    1|
|              Alarms|   NULL|    1|
|           Explosion|   NULL|    1|
|       Assist Police|  94114|    1|
|Extrication / Ent...|  94102|    1|
|       Assist Police|  94122|    1|
|   High Angle Rescue|  94107|    1|
|              HazMat|  94123|    1|
|   High Angle Rescue|  94123|    1|
|      Administrative|  94105|    1|
|Odor (Strange / U...|  94158|    1|
|Confined Space / ...|  94112|    1|
|Confined Space / ...|  94114|    1|
|  Suspicious Package|  94132|    1|
|         Marine Fire|  94109|    1|
|Extrication / Ent...|  94109|    1|
+--------------------+-------+-----+
only showing top 20 rows


In [13]:
# Q6. What San Francisco neighborhoods are in the zip codes 94102 and 94103
# select distinct Neighborhood, Zipcode
# from fire_service_calls_tbl
# where Zipcode== 94102 or Zipcode == 94103


q6_df = fire_df.select('Neighborhood' , 'Zipcode')\
    .where('Zipcode== 94102 or Zipcode == 94103').show()


                

+--------------------+-------+
|        Neighborhood|Zipcode|
+--------------------+-------+
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|     South of Market|  94103|
|             Mission|  94103|
|             Mission|  94103|
|     South of Market|  94103|
|          Tenderloin|  94102|
|        Hayes Valley|  94102|
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|     South of Market|  94103|
|        Hayes Valley|  94102|
|Financial Distric...|  94103|
|        Hayes Valley|  94102|
|             Mission|  94103|
|          Tenderloin|  94102|
|          Tenderloin|  94102|
|        Hayes Valley|  94102|
|     South of Market|  94103|
|     South of Market|  94103|
+--------------------+-------+
only showing top 20 rows


In [None]:
# Q7. What was the sum of all calls, average, min and max of the response times for calls?
# select sum(NumAlarms), avg(Delay), min(Delay), max(Delay)
# from fire_service_calls_tbl


     
# Q8. How many distinct years of data is in the CSV file?
# select distinct year(to_timestamp(CallDate, "MM/dd/yyyy")) as year_num
# from fire_service_calls_tbl
# order by year_num


     
# Q9. What week of the year in 2018 had the most fire calls?
# select weekofyear(to_timestamp(CallDate, "MM/dd/yyyy")) week_year, count(*) as count
# from fire_service_calls_tbl 
# where year(to_timestamp(CallDate, "MM/dd/yyyy")) == 2018
# group by week_year
# order by count desc


     
# Q10. What neighborhoods in San Francisco had the worst response time in 2018?
# select Neighborhood, Delay
# from fire_service_calls_tbl 
# where year(to_timestamp(CallDate, "MM/dd/yyyy")) == 2018