## Overview

This notebook include some ETL for the San Fransisco Fire Alarms Dataset that I uploaded to DBFS.

This notebook is written in **Python**.

In [2]:
# In Python, define a schema
from pyspark.sql.types import *

# Programmatic way to define a schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                StructField('UnitID', StringType(), True),
                StructField('IncidentNumber', IntegerType(), True),
                StructField('CallType', StringType(), True),
                StructField('CallDate', StringType(), True),
                StructField('WatchDate', StringType(), True),
                StructField('CallFinalDisposition', StringType(), True),
                StructField('AvailableDtTm', StringType(), True),
                StructField('Address', StringType(), True),
                StructField('City', StringType(), True),
                StructField('Zipcode', IntegerType(), True),
                StructField('Battalion', StringType(), True),
                StructField('StationArea', StringType(), True),
                StructField('Box', StringType(), True),
                StructField('OriginalPriority', StringType(), True),
                StructField('Priority', StringType(), True),
                StructField('FinalPriority', IntegerType(), True),
                StructField('ALSUnit', BooleanType(), True),
                StructField('CallTypeGroup', StringType(), True),
                StructField('NumAlarms', IntegerType(), True),
                StructField('UnitType', StringType(), True),
                StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                StructField('FirePreventionDistrict', StringType(), True),
                StructField('SupervisorDistrict', StringType(), True),
                StructField('Neighborhood', StringType(), True),
                StructField('Location', StringType(), True),
                StructField('RowID', StringType(), True),
                StructField('Delay', FloatType(), True)])

In [3]:
# File location and type
file_location = "/FileStore/tables/sf_fire_calls.csv"
file_type = "csv"

# CSV options
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location, schema=fire_schema)

display(df)

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay
20110016,T13,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:51:44 AM,2000 Block of CALIFORNIA ST,SF,94109.0,B04,38,3362,3,3,3,False,,1,TRUCK,2,4.0,5.0,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-T13,2.95
20110022,M17,2003241,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 03:01:18 AM,0 Block of SILVERVIEW DR,SF,94124.0,B10,42,6495,3,3,3,True,,1,MEDIC,1,10.0,10.0,Bayview Hunters Point,"(37.7337623673897, -122.396113802632)",020110022-M17,4.7
20110023,M41,2003242,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 02:39:50 AM,MARKET ST/MCALLISTER ST,SF,94102.0,B03,1,1455,3,3,3,True,,1,MEDIC,2,3.0,6.0,Tenderloin,"(37.7811772186856, -122.411699931232)",020110023-M41,2.4333334
20110032,E11,2003250,Vehicle Fire,01/11/2002,01/10/2002,Other,01/11/2002 04:16:46 AM,APPLETON AV/MISSION ST,SF,94110.0,B06,32,5626,3,3,3,False,,1,ENGINE,1,6.0,9.0,Bernal Heights,"(37.7388432849018, -122.423948785199)",020110032-E11,1.5
20110043,B04,2003259,Alarms,01/11/2002,01/10/2002,Other,01/11/2002 06:01:58 AM,1400 Block of SUTTER ST,SF,94109.0,B04,3,3223,3,3,3,False,,1,CHIEF,2,4.0,2.0,Western Addition,"(37.7872890372638, -122.424236212664)",020110043-B04,3.4833333
20110072,T08,2003279,Structure Fire,01/11/2002,01/11/2002,Other,01/11/2002 08:03:26 AM,BEALE ST/FOLSOM ST,SF,94105.0,B03,35,2122,3,3,3,False,,1,TRUCK,2,3.0,6.0,Financial District/South Beach,"(37.7886866619654, -122.392722833778)",020110072-T08,1.75
20110125,E33,2003301,Alarms,01/11/2002,01/11/2002,Other,01/11/2002 09:46:44 AM,0 Block of FARALLONES ST,SF,94112.0,B09,33,8324,3,3,3,False,,1,ENGINE,2,9.0,11.0,Oceanview/Merced/Ingleside,"(37.7140353531157, -122.454117149916)",020110125-E33,2.7166667
20110130,E36,2003304,Alarms,01/11/2002,01/11/2002,Other,01/11/2002 09:58:53 AM,600 Block of POLK ST,SF,94102.0,B02,3,3114,3,3,3,False,,1,ENGINE,1,2.0,6.0,Tenderloin,"(37.7826266328595, -122.41915582123)",020110130-E36,1.7833333
20110197,E05,2003343,Medical Incident,01/11/2002,01/11/2002,Other,01/11/2002 12:06:57 PM,1500 Block of WEBSTER ST,SF,94115.0,B04,5,3513,3,3,3,False,,1,ENGINE,1,4.0,5.0,Japantown,"(37.784958590666, -122.431435274503)",020110197-E05,1.5166667
20110215,E06,2003348,Medical Incident,01/11/2002,01/11/2002,Other,01/11/2002 01:08:40 PM,DIAMOND ST/MARKET ST,SF,94114.0,B05,6,5415,3,3,3,False,,1,ENGINE,1,5.0,8.0,Castro/Upper Market,"(37.7618954753708, -122.437298717721)",020110215-E06,2.7666667


* Let's save the df as a Parquet file.

In [5]:
# parquet_path = "\probe"
# df.write.format("parquet").save(parquet_path)

* Projections and filters.

In [7]:
few_fire_df = (df
               .select("IncidentNumber", "AvailableDtTm", "CallType")
               .where(df["CallType"] != "Medical Incident"))
few_fire_df.show(5, truncate=False)

In [8]:
# Return number of distinct types of calls using countDistinct()
from pyspark.sql.functions import *

(df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .agg(countDistinct("CallType").alias("DistinctCallTypes"))
 .show())

In [9]:
df.select("CallType").where(col("CallType").isNotNull()).distinct().show(15, False)

In [10]:
from pyspark.sql.functions import *

(df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .groupBy("CallType")
 .agg(count("CallType").alias("Total"))
 .orderBy("Total", ascending=False)
 .show())

* Renaming, adding, and dropping columns.

In [12]:
"""let’s change the name of our Delay column to ResponseDelayedinMins and 
take a look at the response times that were longer than five minutes:"""

new_fire_df = df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(new_fire_df
 .select("ResponseDelayedinMins")
 .where(col("ResponseDelayedinMins") > 5)
   .show(5, False))

In [13]:
"""to/from date/timestamp
functions such as to_timestamp() and to_date() that we can use for just this
purpose:"""

fire_ts_df = (new_fire_df
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
              .drop("CallDate")
              .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
              .drop("WatchDate")
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
              .drop("AvailableDtTm"))

(fire_ts_df
.select("IncidentDate", "OnWatchDate", "AvailableDtTS")
.show(5, False))

In [14]:
"""how many years’ worth of Fire Department calls are included in the data set with this
query:"""

(fire_ts_df
 .select(year('IncidentDate'))
 .distinct()
 .orderBy(year('IncidentDate'))
 .show())

* Aggregations.

In [16]:
"""what were the most common types of fire calls?"""

(fire_ts_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))

* Other common DataFrame operations.

In [18]:
"""Here we compute the sum of alarms, the average response time, and the minimum
and maximum response times to all fire calls in our data set, importing the PySpark
functions in a Pythonic way so as not to conflict with the built-in Python functions:"""

import pyspark.sql.functions as F

(fire_ts_df
 .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),
         F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
 .show())

In [19]:
"""What were all the different types of fire calls in 2018?"""

(fire_ts_df
 .select("CallType", 'IncidentDate')
 .where(col("CallType").isNotNull() & (year("IncidentDate") == 2018))
 .groupBy("CallType")
 .agg(count("CallType").alias("Total2018"))
 .orderBy("Total2018", ascending=False)
 .show())

In [20]:
"""What months within the year 2018 saw the highest number of fire calls?"""

(fire_ts_df
 .select("CallType", 'IncidentDate')
 .where(col("CallType").isNotNull() & (year("IncidentDate") == 2018))
 .groupBy(month("IncidentDate"))
 .agg(count("CallType").alias("Total2018"))
 .orderBy("Total2018", ascending=False)
 .show())

In [21]:
"""Which neighborhood in San Francisco generated the most fire calls in 2018?"""

(fire_ts_df
 .select("CallType", "IncidentDate", "Neighborhood")
 .where(col("CallType").isNotNull() & (year("IncidentDate") == 2018))
 .groupBy("Neighborhood")
 .agg(count("CallType").alias("Fire calls neighborhood 2018"))
 .orderBy("Fire calls neighborhood 2018", ascending=False)
 .show())

In [22]:
"""Which neighborhoods had the worst response times to fire calls in 2018?"""

fire_ts_df.select("Neighborhood", "ResponseDelayedinMins").filter(year("IncidentDate") == 2018).show(20, False)

In [23]:
"""Which week in the year in 2018 had the most fire calls?"""

(fire_ts_df.select("IncidentDate")
 .filter(year("IncidentDate") == 2018)
 .groupBy(weekofyear("IncidentDate"))
 .agg(count("IncidentDate").alias("Week of the most fire calls 2018"))
 .orderBy("Week of the most fire calls 2018", ascending=False)
 .show())

In [24]:
"""Other possibility to count weeks"""

fire_ts_df.filter(year('IncidentDate') == 2018).groupBy(weekofyear('IncidentDate')).count().orderBy('count', ascending=False).show()

In [25]:
"""Is there a correlation between neighborhood, zip code, and number of fire calls?"""

(fire_ts_df.select("IncidentDate", "Zipcode", "Neighborhood")
 .filter(year("IncidentDate") == 2018)
 .groupBy("Neighborhood", "Zipcode")
 .agg(count("IncidentDate").alias("No of fire calls 2018"))
 .orderBy("No of fire calls 2018", ascending=False)
 .show())

"""There is a positive correlation between neighborhood, zip code, and number of fire calls. The most fire calls are from the most dangerous district od Tenderloin or close ZIPcodes of other districts in directly neighborhood like SoMa 94103 Financial District 94105"""

* How can we use Parquet files or SQL tables to store this data and read it back?

In [27]:
fire_ts_df.write.format("parquet").mode("overwrite").saveAsTable("FireServiceCalls")

In [28]:
%sql
CACHE TABLE FireServiceCalls

In [29]:
%sql
SELECT * FROM FireServiceCalls LIMIT 10

CallNumber,UnitID,IncidentNumber,CallType,CallFinalDisposition,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,ResponseDelayedinMins,IncidentDate,OnWatchDate,AvailableDtTS
110660238,60,11021821,Medical Incident,Fire,0 Block of BORICA ST,SF,94127,B09,19,8442,3,2,2,True,,1,MEDIC,1,9,7,West of Twin Peaks,"(37.7225922041408, -122.468227979374)",110660238-60,2.9,2011-03-07T00:00:00.000+0000,2011-03-07T00:00:00.000+0000,2011-03-07T16:23:05.000+0000
110660253,B03,11021831,Alarms,Fire,1000 Block of HOWARD ST,SF,94103,B03,1,2252,3,3,3,False,,1,CHIEF,3,3,6,South of Market,"(37.7795675906982, -122.407474134289)",110660253-B03,2.3833334,2011-03-07T00:00:00.000+0000,2011-03-07T00:00:00.000+0000,2011-03-07T17:13:21.000+0000
110660282,E43,11021856,Medical Incident,Other,PERSIA AV/MADRID ST,SF,94112,B09,43,6134,3,3,3,True,,1,ENGINE,1,9,11,Excelsior,"(37.7216187707613, -122.432805977203)",110660282-E43,1.65,2011-03-07T00:00:00.000+0000,2011-03-07T00:00:00.000+0000,2011-03-07T18:55:52.000+0000
110660322,94,11021888,Medical Incident,Other,1100 Block of HOWARD ST,SF,94103,B03,1,2314,3,3,3,True,,1,MEDIC,3,2,6,South of Market,"(37.7768521020734, -122.410711239368)",110660322-94,1.5666667,2011-03-07T00:00:00.000+0000,2011-03-07T00:00:00.000+0000,2011-03-07T20:49:25.000+0000
110670051,B09,11021963,Alarms,Fire,5800 Block of MISSION ST,SF,94112,B09,33,6213,3,3,3,False,,1,CHIEF,3,9,11,Outer Mission,"(37.709160030699, -122.451801431228)",110670051-B09,2.8,2011-03-08T00:00:00.000+0000,2011-03-07T00:00:00.000+0000,2011-03-08T07:36:00.000+0000
110670076,B06,11021980,Alarms,Fire,3600 Block of 20TH ST,SF,94110,B06,7,5445,3,3,3,False,,1,CHIEF,2,6,8,Mission,"(37.7584863037287, -122.422385383463)",110670076-B06,2.5,2011-03-08T00:00:00.000+0000,2011-03-08T00:00:00.000+0000,2011-03-08T08:52:16.000+0000
110670077,E44,11021981,Alarms,Other,1700 Block of VISITACION AVE,SF,94134,B09,44,6263,3,3,3,True,,1,ENGINE,1,9,10,McLaren Park,"(37.7162666820869, -122.414215025813)",110670077-E44,1.6166667,2011-03-08T00:00:00.000+0000,2011-03-08T00:00:00.000+0000,2011-03-08T09:15:07.000+0000
110670082,92,11021988,Medical Incident,Code 2 Transport,100 Block of GOLDEN GATE AVE,SF,94102,B02,1,1546,1,1,2,True,,1,MEDIC,2,2,6,Tenderloin,"(37.7820224371777, -122.413054148253)",110670082-92,13.416667,2011-03-08T00:00:00.000+0000,2011-03-08T00:00:00.000+0000,2011-03-08T10:23:19.000+0000
110670089,E01,11021993,Medical Incident,Other,200 Block of EDDY ST,SF,94102,B03,1,1453,3,3,3,True,,1,ENGINE,1,3,6,Tenderloin,"(37.7840910361755, -122.411784369455)",110670089-E01,3.55,2011-03-08T00:00:00.000+0000,2011-03-08T00:00:00.000+0000,2011-03-08T09:48:55.000+0000
110670101,84,11021998,Medical Incident,Code 2 Transport,CALL BOX: SAN JOSE AV/SANTA YNEZ AV,SF,94112,B09,15,8276,2,2,2,True,,1,MEDIC,1,9,11,Outer Mission,"(37.7258249736518, -122.442324422614)",110670101-84,4.55,2011-03-08T00:00:00.000+0000,2011-03-08T00:00:00.000+0000,2011-03-08T11:13:38.000+0000
