In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"

In [0]:
#how to work with files on Databricks https://docs.databricks.com/files/index.html

#inspect the data 
%fs head databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv

In [0]:
#define schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

In [0]:
fire_sdf = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

In [0]:
#Best practice for cache(), count(), and take()   https://learn.microsoft.com/en-us/azure/databricks/kb/scala/best-practice-cache-count-take

#Cache the DataFrame
fire_sdf.cache()

In [0]:
fire_sdf.count()

In [0]:
fire_sdf.printSchema()

In [0]:
#show 50 rows with a quick visual - average delay by visual
display(fire_sdf.limit(50))

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,CallFinalDisposition,AvailableDtTm,Address,City,Zipcode,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumAlarms,UnitType,UnitSequenceInCallDispatch,FirePreventionDistrict,SupervisorDistrict,Neighborhood,Location,RowID,Delay
20110014,M29,2003234,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 01:58:43 AM,10TH ST/MARKET ST,SF,94103,B02,36,2338,1,1,2,True,,1,MEDIC,1,2,6,Tenderloin,"(37.7765408927183, -122.417501464907)",020110014-M29,5.233333
20110015,M08,2003233,Medical Incident,01/11/2002,01/10/2002,Other,01/11/2002 02:10:17 AM,300 Block of 5TH ST,SF,94107,B03,8,2243,1,1,2,True,,1,MEDIC,1,3,6,South of Market,"(37.7792841462441, -122.402061300134)",020110015-M08,3.0833333
20110016,B02,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:47:00 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,CHIEF,6,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-B02,3.05
20110016,B04,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:51:54 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,CHIEF,3,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-B04,2.3166666
20110016,D2,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:47:00 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,CHIEF,4,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-D2,3.0166667
20110016,E03,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:47:00 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,ENGINE,7,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-E03,2.6833334
20110016,E38,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:51:17 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,ENGINE,1,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-E38,2.1
20110016,E41,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:47:00 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,ENGINE,8,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-E41,2.7166667
20110016,M03,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:46:38 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,True,,1,MEDIC,10,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-M03,2.7666667
20110016,RS1,2003235,Structure Fire,01/11/2002,01/10/2002,Other,01/11/2002 01:46:57 AM,2000 Block of CALIFORNIA ST,SF,94109,B04,38,3362,3,3,3,False,,1,RESCUE SQUAD,9,4,5,Pacific Heights,"(37.7895840679362, -122.428071912459)",020110016-RS1,3.2666667


Output can only be rendered in Databricks

In [0]:
#filter out "Medical Incident" call types

filtered_fire_sdf = (
    fire_sdf.select("IncidentNumber","AvailableDtTm","CallType")
            .where(col("CallType") != "Medical Incident")
)

filtered_fire_sdf.show(5, truncate=False)

In [0]:
fire_sdf.select("CallType").where(col("CallType").isNotNull()).distinct().count()

In [0]:
fire_sdf.select("CallType").where(col("CallType").isNotNull()).distinct().show(truncate=False)

#title

In [0]:
display(fire_sdf.select("Delay").where(col("Delay") > 10))

Delay
77.333336
18.066668
12.733334
21.116667
10.3
11.066667
28.25
11.383333
10.533334
10.566667


Output can only be rendered in Databricks

In [0]:
(fire_sdf
 .select("CallType").where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False))

In [0]:
display(fire_sdf
 .select("CallType").where(col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .limit(10))

CallType,count
Medical Incident,2843475
Structure Fire,578998
Alarms,483518
Traffic Collision,175507
Citizen Assist / Service Call,65360
Other,56961
Outside Fire,51603
Vehicle Fire,20939
Water Rescue,20037
Gas Leak (Natural and LP Gases),17284


Output can only be rendered in Databricks

In [0]:
display(fire_sdf
 .select(col("ZipCode").cast('string'))
 .where(col("CallType").isNotNull())
 .groupBy("Zipcode")
 .count()
 .orderBy("count", ascending=False))

Zipcode,count
94102.0,543425
94103.0,521881
94110.0,370467
94109.0,365325
94124.0,229764
94112.0,207329
94115.0,196951
94107.0,174787
94122.0,159162
94133.0,157593


Output can only be rendered in Databricks

In [0]:
display(fire_sdf
        .select("Zipcode","Neighborhood")
        .where((col("Zipcode") == 94102) | (col("Zipcode") == 94103))
        .orderBy("Zipcode","Neighborhood")
        .distinct())


Zipcode,Neighborhood
94103,South of Market
94102,Financial District/South Beach
94102,South of Market
94103,Castro/Upper Market
94102,Hayes Valley
94103,Financial District/South Beach
94102,Tenderloin
94102,Nob Hill
94102,Mission
94103,Mission


In [0]:
fire_sdf.select(sum("NumAlarms"), avg("Delay"), min("Delay"), max("Delay")).show()


In [0]:
(fire_sdf
 .filter(year(to_date(col("CallDate"), "MM/dd/yyy")) == 2018)
 .groupBy("Neighborhood")
 .agg(avg("Delay").alias("AverageDelay"))
 .show()
)