In [109]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, to_timestamp, round, countDistinct, expr, count, sum, avg, min, max, to_timestamp, unix_timestamp, year, weekofyear
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DateType

In [95]:
spark = SparkSession.builder. \
                    master("local[4]"). \
                    appName("Processing Fire Department & Emergency Medical Services"). \
                    getOrCreate()

In [96]:
raw_df = spark.read. \
                format("csv"). \
                option("header", "true"). \
                option("inferSchema", "true"). \
                load("data/Fire*.csv")

                                                                                

In [97]:
renamed_df = raw_df. \
                    withColumnRenamed("Call Number", "CallNumber"). \
                    withColumnRenamed("Unit ID", "UnitID"). \
                    withColumnRenamed("Incident Number", "IncidentNumber"). \
                    withColumnRenamed("Call Date", "CallDate"). \
                    withColumnRenamed("Watch Date", "WatchDate"). \
                    withColumnRenamed("Call Final Disposition", "CallFinalDisposition"). \
                    withColumnRenamed("Available DtTm", "AvailableDtTm"). \
                    withColumnRenamed("Zipcode of Incident", "Zipcode"). \
                    withColumnRenamed("Station Area", "StationArea"). \
                    withColumnRenamed("Final Priority", "FinalPriority"). \
                    withColumnRenamed("ALS Unit", "ALSUnit"). \
                    withColumnRenamed("Call Type Group", "CallTypeGroup"). \
                    withColumnRenamed("Unit Sequence in call dispatch", "UnitSequenceInCallDispatch"). \
                    withColumnRenamed("Fire Prevention District", "FirePreventionDistrict"). \
                    withColumnRenamed("Superivisor District", "SupervisorDistrict")

In [98]:
parsed_df = renamed_df. \
                    withColumn("CallDate", to_date(col("CallDate"), "MM/dd/yyyy")). \
                    withColumn("WatchDate", to_date(col("WatchDate"), "MM/dd/yyyy")). \
                    withColumn("AvailableDtTm", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")). \
                    withColumn("Response DtTm", to_timestamp(col("Response DtTm"), "MM/dd/yyyy hh:mm:ss a")). \
                    withColumn("Dispatch DtTm", to_timestamp(col("Dispatch DtTm"), "MM/dd/yyyy hh:mm:ss a"))


1. How many distinct type of calls were made to the fire department?

In [99]:
distinctCount_df = parsed_df. \
    where("'Call Type' is not null"). \
    agg(
        countDistinct("Call Type").alias("CountDistinctCallTypes")
    )

distinctCount_df.show(1)



+----------------------+
|CountDistinctCallTypes|
+----------------------+
|                    32|
+----------------------+



                                                                                

2. What were the distinct types of calls made to the fire department?

In [100]:
distinctType_df = parsed_df. \
                        where(" 'Call Type' is not null "). \
                        select(col("Call Type").alias("DistinctCallTypes")). \
                        distinct()

distinctType_df.show()



+--------------------+
|   DistinctCallTypes|
+--------------------+
|Elevator / Escala...|
|         Marine Fire|
|  Aircraft Emergency|
|Confined Space / ...|
|Structure Fire / ...|
|      Administrative|
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|              HazMat|
|Watercraft in Dis...|
|           Explosion|
|           Oil Spill|
|        Vehicle Fire|
|  Suspicious Package|
|Extrication / Ent...|
|               Other|
|        Outside Fire|
|   Traffic Collision|
|       Assist Police|
+--------------------+
only showing top 20 rows



                                                                                

3. What were the most common Call Types

In [101]:
CommonCallTypes_df = parsed_df. \
                            where(" 'Call Type' is not null "). \
                            select(col("Call Type")).\
                            groupBy(col("Call Type")). \
                            count(). \
                            orderBy("count", ascending=False)
CommonCallTypes_df.show()



+--------------------+-------+
|           Call Type|  count|
+--------------------+-------+
|    Medical Incident|4377025|
|              Alarms| 745938|
|Structure Fire / ...| 743202|
|   Traffic Collision| 266178|
|               Other| 116743|
|Citizen Assist / ...| 100283|
|        Outside Fire|  89817|
|        Water Rescue|  35072|
|Gas Leak (Natural...|  31938|
|        Vehicle Fire|  28982|
|   Electrical Hazard|  23166|
|Elevator / Escala...|  18578|
|Smoke Investigati...|  14994|
|Odor (Strange / U...|  13830|
|          Fuel Spill|   7165|
|              HazMat|   4408|
|Industrial Accidents|   3423|
|           Explosion|   3128|
|Train / Rail Inci...|   1735|
|       Assist Police|   1523|
+--------------------+-------+
only showing top 20 rows



                                                                                

4. What Zipcodes accounted for most common calls?

In [102]:
Zipcode_df = parsed_df. \
                    select("Zipcode"). \
                    groupBy("Zipcode"). \
                    agg(
                        count("Zipcode").alias("CallCount")
                    ). \
                    orderBy("CallCount", ascending=False)
                    

Zipcode_df.show()



+-------+---------+
|Zipcode|CallCount|
+-------+---------+
|94103.0|   879162|
|94102.0|   808518|
|94109.0|   582802|
|94110.0|   549309|
|94124.0|   337500|
|94112.0|   308737|
|94115.0|   277190|
|94107.0|   265437|
|94122.0|   233890|
|94133.0|   227246|
|94117.0|   218903|
|94118.0|   192337|
|94114.0|   187734|
|94134.0|   186453|
|94105.0|   180068|
|94121.0|   170969|
|94132.0|   160347|
|94108.0|   152194|
|94116.0|   144674|
|94123.0|   130863|
+-------+---------+
only showing top 20 rows



                                                                                

5. What SanFransico neighbours are in Zipcodes 94102 and 94103

In [103]:
sanFransicoPeople = parsed_df. \
                                where(" City == 'San Francisco' AND Zipcode in (94102, 94104)")

sanFransicoPeople.count()


                                                                                

423757

6. What as was the sum of all call, average, min, max of call response times? 

In [104]:
parsed_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- CallDate: date (nullable = true)
 |-- WatchDate: date (nullable = true)
 |-- Received DtTm: string (nullable = true)
 |-- Entry DtTm: string (nullable = true)
 |-- Dispatch DtTm: timestamp (nullable = true)
 |-- Response DtTm: timestamp (nullable = true)
 |-- On Scene DtTm: string (nullable = true)
 |-- Transport DtTm: string (nullable = true)
 |-- Hospital DtTm: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: timestamp (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: double (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Original Priority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- Fin

In [105]:
ResponseTime_df = parsed_df. \
                    withColumn("Response_Time", (unix_timestamp(col("Response DtTm")) - unix_timestamp(col("Dispatch DtTm"))) / 60.0 ). \
                    where("Response_Time > 0")
                    
CallAggregations_df = ResponseTime_df. \
                        agg(
                            sum(col("Response_Time")).alias("CallResponseTime"),
                            avg(col("Response_Time")).alias("AvgCallResponseTime"),
                            min(col("Response_Time")).alias("MinCallResponseTime"),
                            max(col("Response_Time")).alias("MaxCallResponseTime")
                        )

CallAggregations_df.show()




+-----------------+-------------------+--------------------+-------------------+
| CallResponseTime|AvgCallResponseTime| MinCallResponseTime|MaxCallResponseTime|
+-----------------+-------------------+--------------------+-------------------+
|8394448.016667249|  1.401515174535132|0.016666666666666666| 1878.8666666666666|
+-----------------+-------------------+--------------------+-------------------+



                                                                                

7. How many distinct years of data are there in the csv file?

In [106]:
DistinctYears = ResponseTime_df.select(expr("year(CallDate)")).distinct()
DistinctYears.count()

                                                                                

25

8. What week of the year in 2018 had the most fire calls?

In [116]:
WeekofYear_df = ResponseTime_df. \
                            withColumn("WeekOfYear", expr("weekOfYear(CallDate)")). \
                            where(" year(CallDate) == 2018 "). \
                            select("WeekOfyear"). \
                            groupBy(col("WeekOfYear")). \
                            count(). \
                            orderBy("count", ascending=False)

WeekofYear_df.show()



+----------+-----+
|WeekOfYear|count|
+----------+-----+
|         1| 6953|
|        49| 5833|
|        25| 5818|
|        13| 5785|
|        44| 5725|
|        46| 5700|
|        22| 5689|
|        48| 5689|
|        27| 5687|
|         2| 5684|
|        16| 5683|
|         5| 5678|
|        40| 5658|
|        43| 5657|
|         6| 5589|
|         8| 5572|
|        18| 5566|
|         9| 5561|
|        45| 5540|
|        38| 5540|
+----------+-----+
only showing top 20 rows



                                                                                

9. What neighbourhoods in SanFrancisco had the worst response time in 2018?

In [119]:
neighborhood_avg_response = ResponseTime_df. \
                            where(" year(CallDate) == 2018 and 'Analysis Neighborhoods' is not null "). \
                            groupBy(col("Analysis Neighborhoods")). \
                            agg(
                                avg("Response_Time").alias("AvgResponseTime")
                            ). \
                            orderBy("AvgResponseTime", ascending=False)

neighborhood_avg_response.show()



+----------------------+------------------+
|Analysis Neighborhoods|   AvgResponseTime|
+----------------------+------------------+
|                    37|1.7553019824804041|
|                    27|1.5037859876851394|
|                    38|1.2625219529329121|
|                  NULL|1.2359060402684565|
|                    17|1.2213818860877683|
|                    19|1.1665086887835707|
|                    10|1.1645498783454984|
|                    25|1.1283571105072456|
|                     1|1.1270918692524914|
|                    26|1.1168206820682067|
|                    13|1.1144991317885184|
|                    33|1.1106085985482976|
|                    40|1.1086716524216518|
|                     4|1.1029662921348313|
|                    16|1.1010693592365355|
|                     6|1.0985548776730132|
|                    32|1.0923261125847337|
|                    22| 1.090273379776229|
|                    41|1.0877408993576057|
|                     2|1.082728

                                                                                