In [17]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [18]:
spark = (SparkSession
       .builder
       .appName("DF SF Fire Example")
       .getOrCreate())

# Reading structured data using a schema 

Spark can infer the schema but that can be costly. Spark can also infer schema from a sample at a lesser cost.

In [19]:
fire_schema =  StructType([StructField('CallNumber', IntegerType(), True),
                           StructField('UnitID', StringType(), True),
                           StructField('IncidentNumber', IntegerType(), True),
                           StructField('CallType', StringType(), True),
                           StructField('CallDate', StringType(), True),
                           StructField('WatchDate', StringType(), True),
                           StructField('CallFinalDisposition', StringType(), True),
                           StructField('AvailableDtTm', StringType(), True),
                           StructField('Address', StringType(), True),
                           StructField('City', StringType(), True),
                           StructField('Zipcode', IntegerType(), True),
                           StructField('Battalion', StringType(), True),
                           StructField('StationArea', StringType(), True),
                           StructField('Box', StringType(), True),
                           StructField('OriginalPriority', StringType(), True),
                           StructField('Priority', StringType(), True),
                           StructField('FinalPriority', IntegerType(), True),
                           StructField('ALSUnit', BooleanType(), True),
                           StructField('CallTypeGroup', StringType(), True),
                           StructField('NumAlarms', IntegerType(), True),
                           StructField('UnitType', StringType(), True),
                           StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                           StructField('FirePreventionDistrict', StringType(), True),
                           StructField('SupervisorDistrict', StringType(), True),
                           StructField('Neighborhood', StringType(), True),
                           StructField('Location', StringType(), True),
                           StructField('RowID', StringType(), True),
                           StructField('Delay', FloatType(), True)])

In [20]:
sf_fire_file = "sf-fire/sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

# Projections and Filters

In [22]:
few_fire_df = (fire_df.select("IncidentNumber", "AvailableDtTm", "CallType")
                .where(col("CallType") != "Medical Incident"))

In [23]:
few_fire_df.show(5,False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [27]:
(fire_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .agg(countDistinct("CallType").alias("DistinctCallTypes"))
 .show(truncate=False))
      

+-----------------+
|DistinctCallTypes|
+-----------------+
|30               |
+-----------------+



In [28]:
(fire_df
 .select("CallType")
 .where(col("CallType").isNotNull())
 .distinct()
 .show(truncate=False))

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

# Renaming, Adding, and Dropping Columns

Let's change the name of our `Delayed` column to `ResponseDelayedinMins` and take a look at response times
that were longer than five minutes.

In [29]:
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")

In [10]:
(new_fire_df
 .select("ResponseDelayedinMins")
 .where(col("ResponseDelayedinMins") > 5)
 .show(5, False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



In [30]:
fire_ts_df = (new_fire_df
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
              .drop("CallDate")
              .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
              .drop("WatchDate")
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
              .drop("AvailableDtTm"))
              

In [31]:
(fire_ts_df
 .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
         .show(5, False))

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [32]:
(fire_ts_df
  .select(year('IncidentDate'))
 .distinct()
 .orderBy(year('IncidentDate'))
 .show())

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



# Aggregations

In [35]:
(fire_ts_df
 .select("callType")
 .where(col("CallType").isNotNull())
 .groupBy("callType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))

+-------------------------------+------+
|callType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [15]:
import pyspark.sql.functions as F

In [16]:
(fire_ts_df
 .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),
         F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
 .show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



Other common statistical methods include `stat()`, `describe()`, `correlation()`, 
`covariance()`, `sampleBy()`, `approxQuantile()`, `frequentItems()`, etc.

# Further Data Explorations

* What were all the different types of fire calls in 2018?
* What months within the year 2018 saw the highest number of fire calls?
* Which neighborhood in San Francisco generated the most fire calls in 2018?
* Which neighborhoods had the worst response times to fire calls in 2018?
* Which week in 2018 had the most fire calls?
* Is there a correlation between neighborhood, zip code, and number of fire calls?