In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


spark = (SparkSession
.builder
.appName("sc-fire-calls")
.getOrCreate()
)


21/08/20 19:43:21 WARN Utils: Your hostname, Hoss-Mac.local resolves to a loopback address: 127.0.0.1, but we couldn't find any external IP address!
21/08/20 19:43:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/08/20 19:43:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/08/20 19:43:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# Programmatic way to define a schema

fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
StructField('UnitID', StringType(), True),
StructField('IncidentNumber', IntegerType(), True),
StructField('CallType', StringType(), True),
StructField('CallDate', StringType(), True),
StructField('WatchDate', StringType(), True),
StructField('CallFinalDisposition', StringType(), True),
StructField('AvailableDtTm', StringType(), True),
StructField('Address', StringType(), True),
StructField('City', StringType(), True),
StructField('Zipcode', IntegerType(), True),
StructField('Battalion', StringType(), True),
StructField('StationArea', StringType(), True),
StructField('Box', StringType(), True),
StructField('OriginalPriority', StringType(), True),
StructField('Priority', StringType(), True),
StructField('FinalPriority', IntegerType(), True),
StructField('ALSUnit', BooleanType(), True),
StructField('CallTypeGroup', StringType(), True),
StructField('NumAlarms', IntegerType(), True),
StructField('UnitType', StringType(), True),
StructField('UnitSequenceInCallDispatch', IntegerType(), True),
StructField('FirePreventionDistrict', StringType(), True),
StructField('SupervisorDistrict', StringType(), True),
StructField('Neighborhood', StringType(), True),
StructField('Location', StringType(), True),
StructField('RowID', StringType(), True),
StructField('Delay', FloatType(), True)])


In [3]:
# Use the DataFrameReader interface to read a CSV file
sf_fire_file = "sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

In [4]:
# Find distinct call types count
distinct_call_types_count = fire_df.select("CallType")\
.where(col("CallType").isNotNull())\
.agg(countDistinct("CallType").alias("Distinct call types"))\
.show()



+-------------------+
|Distinct call types|
+-------------------+
|                 29|
+-------------------+





In [5]:
# Find distinct call types
distinct_call_types = fire_df.select("CallType")\
.where(col("CallType").isNotNull())\
.distinct()\
.show(truncate=False)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Police                               |
|Gas Leak (Na

In [6]:
# Convert CallDate to Date Type from String and Sort the records by Call Date
new_fire_df = fire_df.withColumn("Call Date", to_date("CallDate", "MM/dd/yyyy")).drop("CallDate")

In [7]:
# Count calls grouped by month and neighborhood ordered by call count
new_fire_df.withColumn("CallDate (month)", month("Call Date"))\
.where(col("CallDate (month)").isNotNull())\
.groupBy("CallDate (month)", "Neighborhood")\
.count()\
.orderBy("count", "CallDate (month)", ascending=False)\
.show(truncate=False)




+----------------+---------------+-----+
|CallDate (month)|Neighborhood   |count|
+----------------+---------------+-----+
|8               |Tenderloin     |556  |
|9               |Tenderloin     |468  |
|10              |Tenderloin     |462  |
|11              |Tenderloin     |449  |
|7               |Tenderloin     |448  |
|6               |Tenderloin     |443  |
|5               |Tenderloin     |406  |
|8               |Mission        |400  |
|2               |Tenderloin     |367  |
|10              |South of Market|356  |
|7               |Mission        |355  |
|1               |Tenderloin     |347  |
|10              |Mission        |346  |
|9               |Mission        |343  |
|8               |South of Market|342  |
|7               |South of Market|342  |
|12              |Tenderloin     |335  |
|4               |Tenderloin     |330  |
|11              |Mission        |329  |
|3               |Tenderloin     |328  |
+----------------+---------------+-----+
only showing top



In [8]:
# Count CallTypes per month == February
new_fire_df.select("CallType")\
.where(col("CallType").isNotNull() & (month("Call Date")==2))\
.groupBy("CallType")\
.count()\
.orderBy("count", ascending=False)\
.show(truncate=False)

+-------------------------------+-----+
|CallType                       |count|
+-------------------------------+-----+
|Medical Incident               |1670 |
|Structure Fire                 |469  |
|Alarms                         |291  |
|Traffic Collision              |93   |
|Citizen Assist / Service Call  |37   |
|Other                          |33   |
|Outside Fire                   |19   |
|Vehicle Fire                   |13   |
|Electrical Hazard              |11   |
|Water Rescue                   |11   |
|Gas Leak (Natural and LP Gases)|10   |
|Odor (Strange / Unknown)       |9    |
|Elevator / Escalator Rescue    |9    |
|Smoke Investigation (Outside)  |5    |
|Train / Rail Incident          |4    |
|Aircraft Emergency             |3    |
|Industrial Accidents           |3    |
|Fuel Spill                     |3    |
|HazMat                         |2    |
|Explosion                      |1    |
+-------------------------------+-----+



In [9]:
# Rename column `Call Date` to `CallDate`
renamed_fire_df = new_fire_df.withColumnRenamed("Call Date", "CallDate")

renamed_fire_df.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'WatchDate',
 'CallFinalDisposition',
 'AvailableDtTm',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'Delay',
 'CallDate']

In [10]:
# What were all the different types of fire calls in 2002 ? 
new_fire_df.select("CallType").where(year("Call Date") == 2002).distinct().show(truncate=False)

+-------------------------------+
|CallType                       |
+-------------------------------+
|Elevator / Escalator Rescue    |
|Aircraft Emergency             |
|Alarms                         |
|Odor (Strange / Unknown)       |
|Citizen Assist / Service Call  |
|HazMat                         |
|Oil Spill                      |
|Vehicle Fire                   |
|Suspicious Package             |
|Other                          |
|Outside Fire                   |
|Traffic Collision              |
|Gas Leak (Natural and LP Gases)|
|Water Rescue                   |
|Electrical Hazard              |
|Structure Fire                 |
|Industrial Accidents           |
|Medical Incident               |
|Fuel Spill                     |
|Smoke Investigation (Outside)  |
+-------------------------------+
only showing top 20 rows



In [11]:
# What months within the year 2002 saw the highest number of fire calls ?
new_fire_df.select(date_format("Call Date", "MMM").alias("month"))\
.where(col("Call Date").isNotNull() & (year("Call Date")==2002))\
.groupBy("month")\
.count()\
.orderBy("count", ascending=False)\
.show()


+-----+-----+
|month|count|
+-----+-----+
|  Dec|  721|
|  Oct|  686|
|  Nov|  683|
|  Feb|  681|
|  Aug|  677|
|  Sep|  671|
|  Jul|  669|
|  Mar|  665|
|  Jun|  660|
|  Apr|  630|
|  May|  621|
|  Jan|  491|
+-----+-----+



In [12]:
# Which neighborhood is San-Francisco generated the most fire calls in 2009 ?
new_fire_df.select("Neighborhood")\
.where(col("Neighborhood").isNotNull() & (year("Call Date")==2009))\
.groupBy("Neighborhood")\
.count()\
.orderBy("count", ascending=False)\
.first()

Row(Neighborhood='Tenderloin', count=1096)

In [16]:
# Which neighborhood had the worst response times to filre calls in 2009 ?
new_fire_df.select("Neighborhood", "Delay")\
.where(col("Neighborhood").isNotNull() & (year("Call Date")==2009))\
.groupBy("Neighborhood")\
.sum()\
.show(10, truncate=False)

+-----------------+------------------+
|Neighborhood     |sum(Delay)        |
+-----------------+------------------+
|Inner Sunset     |467.2166685461998 |
|Haight Ashbury   |411.6833316683769 |
|Lincoln Park     |13.51666659116745 |
|Japantown        |232.2333326935768 |
|None             |64.50000047683716 |
|North Beach      |623.6500024199486 |
|Lone Mountain/USF|354.7833339571953 |
|Western Addition |1010.3833348155022|
|Bernal Heights   |550.3833318948746 |
|Mission Bay      |298.39999955892563|
+-----------------+------------------+
only showing top 10 rows



In [19]:
# Which week in the year 2009 had the most fire calls ?
new_fire_df.withColumn("week_of_year", weekofyear("Call Date"))\
.select("week_of_year")\
.where(year("Call Date")==2009)\
.groupBy("week_of_year")\
.count()\
.orderBy("count", ascending=False)\
.first()


Row(week_of_year=44, count=199)