In [0]:
%fs ls /mnt/sf_open_data/fire_dept_calls_for_service/

### ![Spark Logo](https://upload.wikimedia.org/wikipedia/commons/e/ea/Spark-logo-192x100px.png)

[EN] The purpose of this workbook is to describe and use pyspark procedures to analyze data from San Francisco https://data.sfgov.org/Public-Safety. This case study was created following Sameer Faroo's tutorial  
[FR] L'objectif de ce cahier de travail est de décrire et d'utiliser les procédures pyspark pour analyser les données de San Francisco https://data.sfgov.org/Public-Safety. Cette étude de cas a été créée en suivant le tutoriel de Sameer Faroo

In [0]:
#df1 = spark.read.csv('dbfs:/FileStore/shared_uploads/luisalejandro.ruizbareno@sgs.com/Fire_Department_Calls_for_Service.csv', inferSchema = True, header = True)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType

Note that I removed the spaces from the col name to prevent errors. FireSchema will have the schemas for each column in order to reduce time while importing data

In [0]:
fireSchema = StructType([StructField('CallNumber', IntegerType(),True),
                         StructField('UnitID', StringType(), True),
                         StructField('IncidentNumber', IntegerType(),True),
                         StructField('CallType', StringType(), True),
                         StructField('CallDate', StringType(),True),
                         StructField('WatchDate',StringType(), True),
                         StructField('ReceivedDtTm',StringType(),True),
                         StructField('EntryDtTm', StringType(), True),
                         StructField('DispatchDtTm', StringType(), True),
                         StructField('ResponseDtTm',StringType(), True),
                         StructField('OnSceneDtTm',StringType(), True),
                         StructField('TransportDtTm',StringType(), True),
                         StructField('HospitalDtTm',StringType(), True),
                         StructField('CallFinalDisposition',StringType(), True),
                         StructField('AvailableDtTm',StringType(), True),
                         StructField('Address',StringType(), True),
                         StructField('City',StringType(), True),
                         StructField('ZipcodeofIncident', IntegerType(),True),
                         StructField('Battalion',StringType(), True),
                         StructField('StationArea',StringType(), True),
                         StructField('Box',StringType(), True),
                         StructField('OriginalPriority',StringType(), True),
                      StructField('Priority',StringType(), True),
                      StructField('FinalPriority', IntegerType(),True),
                      StructField('ALSUnit',BooleanType(), True),
                      StructField('CallTypeGroup',StringType(), True),
                      StructField('NumberofAlarms', IntegerType(),True),
                      StructField('UnitType',StringType(), True),
                      StructField('Unitsequenceincalldispatch',IntegerType(),True),
                      StructField('FirePreventionDistrict', StringType(), True),
                      StructField('SupervisorDistrict', StringType(), True),
                      StructField('NeighborhooodsAnalysisBoundaries', StringType(), True),
                      StructField('RowID',StringType(), True),
                      StructField('case_location',StringType(), True)
                                  
                                  ])

In [0]:
fireServiceCallsDf = spark.read.csv('dbfs:/FileStore/shared_uploads/luisalejandro.ruizbareno@sgs.com/Fire_Department_Calls_for_Service.csv', 
 schema = fireSchema, header = True)

In [0]:
# look at the First 5 records 
display(fireServiceCallsDf.limit(5))
fireServiceCallsDf.show(5)
#df1.select([col for col in df1.columns]).show()

CallNumber,UnitID,IncidentNumber,CallType,CallDate,WatchDate,ReceivedDtTm,EntryDtTm,DispatchDtTm,ResponseDtTm,OnSceneDtTm,TransportDtTm,HospitalDtTm,CallFinalDisposition,AvailableDtTm,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhooodsAnalysisBoundaries,RowID,case_location
160012540,T19,16000384,Alarms,01/01/2016,01/01/2016,01/01/2016 03:43:46 PM,01/01/2016 03:45:17 PM,01/01/2016 03:45:40 PM,01/01/2016 03:47:29 PM,01/01/2016 03:49:46 PM,,,Fire,01/01/2016 04:07:49 PM,3200 Block of 20TH AVE,San Francisco,94132,B08,19,8861,3,3,3,False,Alarm,1,TRUCK,1,8,7,Lakeshore,160012540-T19,POINT (-122.475646829327 37.728544474924)
160010940,E06,16000179,Structure Fire,01/01/2016,12/31/2015,01/01/2016 03:59:58 AM,01/01/2016 03:59:58 AM,01/01/2016 04:02:50 AM,01/01/2016 04:04:51 AM,01/01/2016 04:07:29 AM,,,Fire,01/01/2016 04:07:36 AM,HERMANN ST/BUCHANAN ST,San Francisco,94102,B02,6,3417,3,3,3,True,Alarm,1,ENGINE,1,2,8,Hayes Valley,160010940-E06,POINT (-122.426780766781 37.77066017434)
160012858,52,16000430,Medical Incident,01/01/2016,01/01/2016,01/01/2016 05:17:49 PM,01/01/2016 05:19:25 PM,01/01/2016 05:19:54 PM,01/01/2016 05:20:02 PM,,,,No Merit,01/01/2016 05:29:39 PM,BAY ST/MASON ST,San Francisco,94133,B01,28,1425,3,3,3,True,Potentially Life-Threatening,1,MEDIC,2,1,3,North Beach,160012858-52,POINT (-122.413604167697 37.805618479936)
160012262,RWC1,16000359,Water Rescue,01/01/2016,01/01/2016,01/01/2016 02:18:59 PM,01/01/2016 02:20:17 PM,01/01/2016 02:23:49 PM,01/01/2016 02:23:49 PM,,,,Fire,01/01/2016 04:04:25 PM,600 Block of POINT LOBOS AVE,San Francisco,94121,B07,34,7314,3,3,3,False,Fire,1,SUPPORT,17,7,1,Outer Richmond,160012262-RWC1,POINT (-122.510476185261 37.779735801578)
160011988,RC3,16000328,Medical Incident,01/01/2016,01/01/2016,01/01/2016 01:02:18 PM,01/01/2016 01:02:18 PM,01/01/2016 01:03:23 PM,01/01/2016 01:05:18 PM,,,,No Merit,01/01/2016 01:11:04 PM,SUNNYDALE AV/GARRISON AV,San Francisco,94134,B09,44,6253,A,E,3,True,Potentially Life-Threatening,1,RESCUE CAPTAIN,3,9,10,Visitacion Valley,160011988-RC3,POINT (-122.412887439901 37.711036169303)


In [0]:
fireServiceCallsDf.columns

In [0]:
fireServiceCallsDf.count()

In [0]:
fireServiceCallsDf.select('CallType').distinct().show()
fireServiceCallsDf.select('CallType').distinct().count() # To Find the number 

In [0]:
fireServiceCallsDf.select('CallType').groupBy('CallType').count().orderBy('count', asc = True).show(35, False) # false keep the weight as long as long is the field

In [0]:
display(fireServiceCallsDf.select('CallType').groupBy('CallType').count().orderBy('count', asc = False))

### ![Spark Logo](https://upload.wikimedia.org/wikipedia/commons/e/ea/Spark-logo-192x100px.png) Date/Time Analysis

In [0]:
fireServiceCallsDf.printSchema()

In [0]:
from pyspark.sql.functions import *

In [0]:
# For this statement we need to change CallDate formate to Date, then use YEAR function to extract the year

#fireServiceCallsDf.select(date_format(
#                          to_timestamp("CallDate",'dd/MM/yyyy'),'yyy/MM/dd').alias('CallDate')).show(5)
fireServiceCallsDf.select(year(
                          to_timestamp("CallDate",'dd/MM/yyyy')).alias('CallDate')).distinct().show(5)
#Get Nulls
fireServiceCallsDf.select(year(
                          to_timestamp("CallDate",'dd/MM/yyyy')).alias('CallDate')).filter(fireServiceCallsDf['CallDate'].isNull()).show()
                   

In [0]:
from_pattern1 = 'MM/dd/yyyy'
to_pattern1 = 'yyyy-MM-dd'

from_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'
to_pattern2 = 'MM/dd/yyyy hh:mm:ss aa'

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

fireServiceCallsTsDf = fireServiceCallsDf \
                  .withColumn('CallDateTS', to_timestamp(fireServiceCallsDf['CallDate'],from_pattern1).cast("timestamp")) \
                  .drop('CallDate') \
                  .withColumn('WatchDateTS', to_timestamp(fireServiceCallsDf['WatchDate'], from_pattern1).cast("timestamp")) \
                  .drop('WatchDate') \
                  .withColumn('ReceivedDtTmTS', to_timestamp(fireServiceCallsDf['ReceivedDtTm'],from_pattern2).cast("timestamp")) \
                  .drop('ReceivedDtTm') \
                  .withColumn('EntryDtTmTS', to_timestamp(fireServiceCallsDf['EntryDtTm'],from_pattern2).cast("timestamp")) \
                  .drop('EntryDtTm') \
                  .withColumn('DispatchDtTmTS', to_timestamp(fireServiceCallsDf['DispatchDtTm'],from_pattern2).cast("timestamp")) \
                  .drop('DispatchDtTm') \
                  .withColumn('ResponseDtTmTS', to_timestamp(fireServiceCallsDf['ResponseDtTm'],from_pattern2).cast("timestamp")) \
                  .drop('ResponseDtTm') \
                  .withColumn('OnSceneDtTmTS', to_timestamp(fireServiceCallsDf['OnSceneDtTm'],from_pattern2).cast("timestamp")) \
                  .drop('OnScenceDtTm') \
                  .withColumn('TransportDtTmTS', to_timestamp(fireServiceCallsDf['TransportDtTm'],from_pattern2).cast("timestamp")) \
                  .drop('TransportDtTm') \
                  .withColumn('HospitalDtTmTS', to_timestamp(fireServiceCallsDf['HospitalDtTm'],from_pattern2).cast("timestamp")) \
                  .drop('HospitalDtTm') \
                  .withColumn('AvailableDtTmTS', to_timestamp(fireServiceCallsDf['AvailableDtTm'],from_pattern2).cast("timestamp")) \
                  .drop('AvailableDtTm')

In [0]:
fireServiceCallsTsDf.printSchema()

In [0]:
display(fireServiceCallsTsDf.limit(5))

CallNumber,UnitID,IncidentNumber,CallType,OnSceneDtTm,CallFinalDisposition,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhooodsAnalysisBoundaries,RowID,case_location,CallDateTS,WatchDateTS,ReceivedDtTmTS,EntryDtTmTS,DispatchDtTmTS,ResponseDtTmTS,OnSceneDtTmTS,TransportDtTmTS,HospitalDtTmTS,AvailableDtTmTS
160012540,T19,16000384,Alarms,01/01/2016 03:49:46 PM,Fire,3200 Block of 20TH AVE,San Francisco,94132,B08,19,8861,3,3,3,False,Alarm,1,TRUCK,1,8,7,Lakeshore,160012540-T19,POINT (-122.475646829327 37.728544474924),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T15:43:46.000+0000,2016-01-01T15:45:17.000+0000,2016-01-01T15:45:40.000+0000,2016-01-01T15:47:29.000+0000,2016-01-01T15:49:46.000+0000,,,2016-01-01T16:07:49.000+0000
160010940,E06,16000179,Structure Fire,01/01/2016 04:07:29 AM,Fire,HERMANN ST/BUCHANAN ST,San Francisco,94102,B02,6,3417,3,3,3,True,Alarm,1,ENGINE,1,2,8,Hayes Valley,160010940-E06,POINT (-122.426780766781 37.77066017434),2016-01-01T00:00:00.000+0000,2015-12-31T00:00:00.000+0000,2016-01-01T03:59:58.000+0000,2016-01-01T03:59:58.000+0000,2016-01-01T04:02:50.000+0000,2016-01-01T04:04:51.000+0000,2016-01-01T04:07:29.000+0000,,,2016-01-01T04:07:36.000+0000
160012858,52,16000430,Medical Incident,,No Merit,BAY ST/MASON ST,San Francisco,94133,B01,28,1425,3,3,3,True,Potentially Life-Threatening,1,MEDIC,2,1,3,North Beach,160012858-52,POINT (-122.413604167697 37.805618479936),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T17:17:49.000+0000,2016-01-01T17:19:25.000+0000,2016-01-01T17:19:54.000+0000,2016-01-01T17:20:02.000+0000,,,,2016-01-01T17:29:39.000+0000
160012262,RWC1,16000359,Water Rescue,,Fire,600 Block of POINT LOBOS AVE,San Francisco,94121,B07,34,7314,3,3,3,False,Fire,1,SUPPORT,17,7,1,Outer Richmond,160012262-RWC1,POINT (-122.510476185261 37.779735801578),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T14:18:59.000+0000,2016-01-01T14:20:17.000+0000,2016-01-01T14:23:49.000+0000,2016-01-01T14:23:49.000+0000,,,,2016-01-01T16:04:25.000+0000
160011988,RC3,16000328,Medical Incident,,No Merit,SUNNYDALE AV/GARRISON AV,San Francisco,94134,B09,44,6253,A,E,3,True,Potentially Life-Threatening,1,RESCUE CAPTAIN,3,9,10,Visitacion Valley,160011988-RC3,POINT (-122.412887439901 37.711036169303),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T13:02:18.000+0000,2016-01-01T13:02:18.000+0000,2016-01-01T13:03:23.000+0000,2016-01-01T13:05:18.000+0000,,,,2016-01-01T13:11:04.000+0000


In [0]:
fireServiceCallsTsDf.select(year('CallDateTS').alias('Year_CallDateTs')).distinct().orderBy('Year_CallDateTs').show()

In [0]:
display(fireServiceCallsTsDf)

CallNumber,UnitID,IncidentNumber,CallType,OnSceneDtTm,CallFinalDisposition,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhooodsAnalysisBoundaries,RowID,case_location,CallDateTS,WatchDateTS,ReceivedDtTmTS,EntryDtTmTS,DispatchDtTmTS,ResponseDtTmTS,OnSceneDtTmTS,TransportDtTmTS,HospitalDtTmTS,AvailableDtTmTS
160012540,T19,16000384,Alarms,01/01/2016 03:49:46 PM,Fire,3200 Block of 20TH AVE,San Francisco,94132,B08,19,8861,3,3,3,False,Alarm,1,TRUCK,1,8.0,7,Lakeshore,160012540-T19,POINT (-122.475646829327 37.728544474924),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T15:43:46.000+0000,2016-01-01T15:45:17.000+0000,2016-01-01T15:45:40.000+0000,2016-01-01T15:47:29.000+0000,2016-01-01T15:49:46.000+0000,,,2016-01-01T16:07:49.000+0000
160010940,E06,16000179,Structure Fire,01/01/2016 04:07:29 AM,Fire,HERMANN ST/BUCHANAN ST,San Francisco,94102,B02,6,3417,3,3,3,True,Alarm,1,ENGINE,1,2.0,8,Hayes Valley,160010940-E06,POINT (-122.426780766781 37.77066017434),2016-01-01T00:00:00.000+0000,2015-12-31T00:00:00.000+0000,2016-01-01T03:59:58.000+0000,2016-01-01T03:59:58.000+0000,2016-01-01T04:02:50.000+0000,2016-01-01T04:04:51.000+0000,2016-01-01T04:07:29.000+0000,,,2016-01-01T04:07:36.000+0000
160012858,52,16000430,Medical Incident,,No Merit,BAY ST/MASON ST,San Francisco,94133,B01,28,1425,3,3,3,True,Potentially Life-Threatening,1,MEDIC,2,1.0,3,North Beach,160012858-52,POINT (-122.413604167697 37.805618479936),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T17:17:49.000+0000,2016-01-01T17:19:25.000+0000,2016-01-01T17:19:54.000+0000,2016-01-01T17:20:02.000+0000,,,,2016-01-01T17:29:39.000+0000
160012262,RWC1,16000359,Water Rescue,,Fire,600 Block of POINT LOBOS AVE,San Francisco,94121,B07,34,7314,3,3,3,False,Fire,1,SUPPORT,17,7.0,1,Outer Richmond,160012262-RWC1,POINT (-122.510476185261 37.779735801578),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T14:18:59.000+0000,2016-01-01T14:20:17.000+0000,2016-01-01T14:23:49.000+0000,2016-01-01T14:23:49.000+0000,,,,2016-01-01T16:04:25.000+0000
160011988,RC3,16000328,Medical Incident,,No Merit,SUNNYDALE AV/GARRISON AV,San Francisco,94134,B09,44,6253,A,E,3,True,Potentially Life-Threatening,1,RESCUE CAPTAIN,3,9.0,10,Visitacion Valley,160011988-RC3,POINT (-122.412887439901 37.711036169303),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T13:02:18.000+0000,2016-01-01T13:02:18.000+0000,2016-01-01T13:03:23.000+0000,2016-01-01T13:05:18.000+0000,,,,2016-01-01T13:11:04.000+0000
160012236,AR1,16000356,Outside Fire,01/01/2016 02:49:08 PM,Fire,MCALLISTER ST/PIERCE ST,San Francisco,94115,B05,5,3642,3,3,3,False,Fire,1,INVESTIGATION,2,5.0,5,Western Addition,160012236-AR1,POINT (-122.435052389241 37.778212770946),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T14:12:11.000+0000,2016-01-01T14:13:18.000+0000,2016-01-01T14:28:19.000+0000,2016-01-01T14:28:19.000+0000,2016-01-01T14:49:08.000+0000,,,2016-01-01T15:05:26.000+0000
160010462,E01,16000092,Medical Incident,01/01/2016 01:49:41 AM,Code 2 Transport,100 Block of 3RD ST,San Francisco,94103,B03,1,2177,2,3,3,True,Non Life-threatening,1,ENGINE,1,3.0,6,Financial District/South Beach,160010462-E01,POINT (-122.400930575935 37.785468868781),2016-01-01T00:00:00.000+0000,2015-12-31T00:00:00.000+0000,2016-01-01T01:38:57.000+0000,2016-01-01T01:41:11.000+0000,2016-01-01T01:48:15.000+0000,2016-01-01T01:48:15.000+0000,2016-01-01T01:49:41.000+0000,,,2016-01-01T02:19:52.000+0000
160012262,E34,16000359,Water Rescue,01/01/2016 02:27:14 PM,Fire,600 Block of POINT LOBOS AVE,San Francisco,94121,B07,34,7314,3,3,3,False,Fire,1,ENGINE,4,7.0,1,Outer Richmond,160012262-E34,POINT (-122.510476185261 37.779735801578),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T14:18:59.000+0000,2016-01-01T14:20:17.000+0000,2016-01-01T14:21:41.000+0000,2016-01-01T14:23:53.000+0000,2016-01-01T14:27:14.000+0000,,,2016-01-01T15:21:38.000+0000
160012566,KM13,16000389,Medical Incident,01/01/2016 04:02:46 PM,Code 2 Transport,0 Block of CASA WAY,San Francisco,94123,B04,16,3563,3,3,3,False,Potentially Life-Threatening,1,PRIVATE,2,4.0,2,Marina,160012566-KM13,POINT (-122.439120724665 37.805586821591),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T15:51:55.000+0000,2016-01-01T15:53:22.000+0000,2016-01-01T15:54:11.000+0000,2016-01-01T15:56:04.000+0000,2016-01-01T16:02:46.000+0000,2016-01-01T16:24:59.000+0000,2016-01-01T16:49:33.000+0000,2016-01-01T17:31:11.000+0000
160012341,B03,16000366,Alarms,,Fire,200 Block of CONNECTICUT ST,San Francisco,94107,B03,37,2462,3,3,3,False,Alarm,1,CHIEF,3,3.0,10,Potrero Hill,160012341-B03,POINT (-122.397622855701 37.763153526145),2016-01-01T00:00:00.000+0000,2016-01-01T00:00:00.000+0000,2016-01-01T14:42:43.000+0000,2016-01-01T14:44:29.000+0000,2016-01-01T14:44:58.000+0000,2016-01-01T14:47:20.000+0000,,,,2016-01-01T14:51:21.000+0000


In [0]:
#fireServiceCallsTsDf.filter(year(fireServiceCallsTsDf['CallDateTS'])==2020).count() # get number of calls by year
#fireServiceCallsTsDf.select('CallDateTS').agg(max("CallDateTS")).show() # get max date
#fireServiceCallsTsDf.select('CallDateTS').filter(year(fireServiceCallsTsDf['CallDateTS'])==2016).agg(max("CallDateTS")).show() # get max date given a year 2016-12-31

fireServiceCallsTsDf.select('CallDateTS').filter(year(fireServiceCallsTsDf['CallDateTS'])==2016).agg(date_sub(max("CallDateTS"),7)).show()



In [0]:
max_day = fireServiceCallsTsDf.filter(year(fireServiceCallsTsDf['CallDateTS'])==2016).agg(date_sub(max("CallDateTS"),7)).collect()[0][0]
#max_day
fireServiceCallsTsDf.select('CallDateTS',dayofyear('CallDateTS') ).filter(fireServiceCallsTsDf['CallDateTS']>=max_day).orderBy(to_date(fireServiceCallsTsDf['CallDateTs']), asc = True).groupBy(fireServiceCallsTsDf['CallDateTS']).count().show()


In [0]:
# in the example, they have until July 6th which is the 1887th day of the year
# Filter the DF down to just 2016 and days of year greater than 180:

#fireServiceCallsTsDf.filter((year('CallDateTS') =='2016') & (dayofyear('CallDateTS')>=180)).select(dayofyear('CallDateTS')).distinct().orderBy('dayofyear(CallDateTs)').show()
fireServiceCallsTsDf.filter(year('CallDateTS') =='2016').filter(dayofyear('CallDateTS')>=180).select(dayofyear('CallDateTS')).distinct().orderBy('dayofyear(CallDateTs)').show()

In [0]:
fireServiceCallsTsDf.filter(year('CallDateTS') =='2016').filter(dayofyear('CallDateTS')>=180).select(dayofyear('CallDateTS')).groupBy('dayofyear(CallDateTs)').count().orderBy('dayofyear(CallDateTs)').show(200)

In [0]:
display(fireServiceCallsTsDf.filter(year('CallDateTS') =='2016').filter((dayofyear('CallDateTS')>=180) & (dayofyear('CallDateTS')<=200)).select(dayofyear('CallDateTS')).groupBy('dayofyear(CallDateTs)').count().orderBy('dayofyear(CallDateTs)'))

dayofyear(CallDateTs),count
180,753
181,731
182,797
183,847
184,729
185,797
186,958
187,821
188,769
189,869


In [0]:
transformations are lazy which means that they will not executed until you call them with an action
Transformations:
  select
  distinct
  groupBy
  sum
  orderBy
  filter
  limit
  
action:
  show
  count
  collect
  save
  
  

### ![Spark Logo](https://upload.wikimedia.org/wikipedia/commons/e/ea/Spark-logo-192x100px.png) Memory, Caching and write to Parquet

In [0]:
#Convert df to rdd to see the partitions 
fireServiceCallsTsDf.rdd.getNumPartitions()

In [0]:
#This means that baisicly a 8 of data is in each partitions. The dataframe does not exist in memory
If I tried to do something with the dataframe, like count action, Spark will start reading the csv files and creating dataframe among the transformations we have. 
the default partitions are not optimal. USE spark correctly, that depends of the cluster we had chosen. For our case, it is a cluster whereby we can perform 3 task simultaniasly. repartition your dataframe in a multiple number of your culster 

In [0]:
fireServiceCallsTsDf.repartition(6).createOrReplaceTempView("FireServiceVIEW");

In [0]:
spark.catalog.cacheTable("FireServiceVIEW")

In [0]:
spark.table("FireServiceVIEW").count()
# It is going to materialized the cache. It will read the file from desk, get a dataframe in a temporarly memory of 8 partitions, then repartition that into 6 and the cache that in memory

In [0]:
# Now this dataframe is operated in a memory cache version
fireServiceDF= spark.table("fireServiceVIEW")

In [0]:
# Note  tha the full scan + count in memory takes < 1 second:
fireServiceDF.count()

In [0]:
# Is my table cache?
spark.catalog.isCached("fireServiceVIEW")

In [0]:
%fs ls /tmp/

path,name,size
dbfs:/tmp/fireServiceParque/,fireServiceParque/,0
dbfs:/tmp/fireServiceParquet/,fireServiceParquet/,0


In [0]:
fireServiceDF.write.format('parquet').save('/tmp/fireServiceParquet/')

In [0]:
%fs ls /tmp/fireServiceParquet/

path,name,size
dbfs:/tmp/fireServiceParquet/_SUCCESS,_SUCCESS,0
dbfs:/tmp/fireServiceParquet/_committed_946766735054573629,_committed_946766735054573629,612
dbfs:/tmp/fireServiceParquet/_started_946766735054573629,_started_946766735054573629,0
dbfs:/tmp/fireServiceParquet/part-00000-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-24-1-c000.snappy.parquet,part-00000-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-24-1-c000.snappy.parquet,29076475
dbfs:/tmp/fireServiceParquet/part-00001-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-25-1-c000.snappy.parquet,part-00001-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-25-1-c000.snappy.parquet,29082708
dbfs:/tmp/fireServiceParquet/part-00002-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-26-1-c000.snappy.parquet,part-00002-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-26-1-c000.snappy.parquet,29072298
dbfs:/tmp/fireServiceParquet/part-00003-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-27-1-c000.snappy.parquet,part-00003-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-27-1-c000.snappy.parquet,29067662
dbfs:/tmp/fireServiceParquet/part-00004-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-28-1-c000.snappy.parquet,part-00004-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-28-1-c000.snappy.parquet,29099793
dbfs:/tmp/fireServiceParquet/part-00005-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-29-1-c000.snappy.parquet,part-00005-tid-946766735054573629-7865bd37-7ce2-4d2e-9414-4c1cca01d3b7-29-1-c000.snappy.parquet,29067868


In [0]:
# in the future when the clusters terminated within an hour, I can read that parquet to continue with the data analysis. This parquet will have all transformation we did
tempDF = spark.read.parquet('/tmp/fireServiceParquet/')

In [0]:
# Parquet file are really efficient  to read from
tempDF.count()

In [0]:
display(tempDF.limit(2))

CallNumber,UnitID,IncidentNumber,CallType,OnSceneDtTm,CallFinalDisposition,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhooodsAnalysisBoundaries,RowID,case_location,CallDateTS,WatchDateTS,ReceivedDtTmTS,EntryDtTmTS,DispatchDtTmTS,ResponseDtTmTS,OnSceneDtTmTS,TransportDtTmTS,HospitalDtTmTS,AvailableDtTmTS
160493153,63,16019839,Medical Incident,02/18/2016 06:44:38 PM,Code 2 Transport,700 Block of UNION ST,San Francisco,94133,B01,28,1422,3,3,3,True,Potentially Life-Threatening,1,MEDIC,2,1,3,Chinatown,160493153-63,POINT (-122.412059325975 37.800140039009),2016-02-18T00:00:00.000+0000,2016-02-18T00:00:00.000+0000,2016-02-18T18:38:21.000+0000,2016-02-18T18:39:47.000+0000,2016-02-18T18:40:13.000+0000,2016-02-18T18:41:34.000+0000,2016-02-18T18:44:38.000+0000,2016-02-18T19:12:14.000+0000,2016-02-18T19:36:23.000+0000,2016-02-18T20:16:08.000+0000
160522402,T08,16020883,Elevator / Escalator Rescue,02/21/2016 04:53:19 PM,Fire,800 Block of HOWARD ST,San Francisco,94103,B03,1,2245,3,3,3,False,Alarm,1,TRUCK,1,3,6,South of Market,160522402-T08,POINT (-122.40377191669 37.782340468054),2016-02-21T00:00:00.000+0000,2016-02-21T00:00:00.000+0000,2016-02-21T16:39:58.000+0000,2016-02-21T16:42:27.000+0000,2016-02-21T16:46:59.000+0000,2016-02-21T16:49:26.000+0000,2016-02-21T16:53:19.000+0000,,,2016-02-21T17:13:28.000+0000


### ![Spark Logo](https://upload.wikimedia.org/wikipedia/commons/e/ea/Spark-logo-192x100px.png) Spark SQL

In [0]:
%sql select count(*) from fireServiceVIEW;

count(1)
1532736


In [0]:
#if we are running spark onpremises ( in my local laptop) the %sql will not work. We need to use something like:
spark.sql("Select count(*) from fireServiceVIEW").show()

In [0]:
%sql select NeighborhooodsAnalysisBoundaries as Neighborhood, count(*) as count from fireServiceVIEW where year(CallDateTS) == 2016
group by NeighborhooodsAnalysisBoundaries order by count asc

Neighborhood,count
Lincoln Park,249
McLaren Park,384
Seacliff,452
,583
Glen Park,1416
Presidio,1608
Twin Peaks,1740
Golden Gate Park,1814
Treasure Island,2186
Presidio Heights,2256


In [0]:
%sql select NeighborhooodsAnalysisBoundaries as Neighborhood, CallType, count(*) as count from fireServiceVIEW where year(CallDateTS) == 2016
group by NeighborhooodsAnalysisBoundaries, CallType order by count asc

Neighborhood,CallType,count
Pacific Heights,Confined Space / Structure Collapse,1
Seacliff,Electrical Hazard,1
Bernal Heights,Elevator / Escalator Rescue,1
Sunset/Parkside,Watercraft in Distress,1
Mission Bay,Assist Police,1
Golden Gate Park,Gas Leak (Natural and LP Gases),1
Seacliff,Odor (Strange / Unknown),1
,Electrical Hazard,1
Sunset/Parkside,Explosion,1
Outer Mission,HazMat,1


In [0]:
%sql DESC fireServiceVIEW;

col_name,data_type,comment
CallNumber,int,
UnitID,string,
IncidentNumber,int,
CallType,string,
OnSceneDtTm,string,
CallFinalDisposition,string,
Address,string,
City,string,
ZipcodeofIncident,int,
Battalion,string,


### ![Spark Logo](https://upload.wikimedia.org/wikipedia/commons/e/ea/Spark-logo-192x100px.png) DataFrame Joins

In [0]:
incidentsDF= spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/luisalejandro.ruizbareno@sgs.com/Fire_Incidents.csv", header = True, inferSchema = True).withColumnRenamed('Incident Number', 'IncidentNumber').cache()

In [0]:
incidentsDF.printSchema()

In [0]:
incidentsDF.count()

In [0]:
display(incidentsDF.limit(3))

Incident Number,Exposure Number,ID,Address,Incident Date,Call Number,Alarm DtTm,Arrival DtTm,Close DtTm,City,zipcode,Battalion,Station Area,Box,Suppression Units,Suppression Personnel,EMS Units,EMS Personnel,Other Units,Other Personnel,First Unit On Scene,Estimated Property Loss,Estimated Contents Loss,Fire Fatalities,Fire Injuries,Civilian Fatalities,Civilian Injuries,Number of Alarms,Primary Situation,Mutual Aid,Action Taken Primary,Action Taken Secondary,Action Taken Other,Detector Alerted Occupants,Property Use,Area of Fire Origin,Ignition Cause,Ignition Factor Primary,Ignition Factor Secondary,Heat Source,Item First Ignited,Human Factors Associated with Ignition,Structure Type,Structure Status,Floor of Fire Origin,Fire Spread,No Flame Spead,Number of floors with minimum damage,Number of floors with significant damage,Number of floors with heavy damage,Number of floors with extreme damage,Detectors Present,Detector Type,Detector Operation,Detector Effectiveness,Detector Failure Reason,Automatic Extinguishing System Present,Automatic Extinguishing Sytem Type,Automatic Extinguishing Sytem Perfomance,Automatic Extinguishing Sytem Failure Reason,Number of Sprinkler Heads Operating,Supervisor District,neighborhood_district,point
8028304,0,80283040,150 Elsie St.,2008-04-01T00:00:00.000+0000,80920257,2008-04-01T18:06:37.000+0000,2008-04-01T18:15:19.000+0000,2008-04-01T18:21:48.000+0000,SF,94110,B06,11,,1,4,0,0,0,0,E11,,,0,0,0,0,1,412 - Gas leak (natural gas or LPG),,86 - Investigate,-,-,-,"962 - Residential street, road or residential dr",,,,,,,,,,,,,,,,,,,,,,,,,,,9.0,Bernal Heights,POINT (-122.41837339 37.74208979)
8028303,0,80283030,85 Turner Tr.,2008-04-01T00:00:00.000+0000,80920256,2008-04-01T18:00:52.000+0000,2008-04-01T18:06:30.000+0000,2008-04-01T18:22:18.000+0000,SF,94107,B10,37,,1,4,0,0,0,0,E37,,,0,0,0,0,1,552 - Police matter,,76 - Provide water,-,-,-,"960 - Street, other",,,,,,,,,,,,,,,,,,,,,,,,,,,10.0,Potrero Hill,POINT (-122.39489 37.756291)
8028309,0,80283090,175 6th St.,2008-04-01T00:00:00.000+0000,80920262,2008-04-01T18:42:06.000+0000,2008-04-01T18:45:23.000+0000,2008-04-01T18:53:25.000+0000,SF,94105,B03,1,,10,35,0,0,0,0,E01,,,0,0,0,0,1,"210 - Steam Rupture, steam, other",,86 - Investigate,-,-,-,429 - Multifamily dwellings,,,,,,,,,,,,,,,,,,,,,,,,,,,,South of Market,POINT (-122.407468 37.78008)


In [0]:
joinedDF = fireServiceDF.join(incidentsDF, fireServiceDF.IncidentNumber == incidentsDF.IncidentNumber)

In [0]:
display(joinedDF.limit(3))

CallNumber,UnitID,IncidentNumber,CallType,OnSceneDtTm,CallFinalDisposition,Address,City,ZipcodeofIncident,Battalion,StationArea,Box,OriginalPriority,Priority,FinalPriority,ALSUnit,CallTypeGroup,NumberofAlarms,UnitType,Unitsequenceincalldispatch,FirePreventionDistrict,SupervisorDistrict,NeighborhooodsAnalysisBoundaries,RowID,case_location,CallDateTS,WatchDateTS,ReceivedDtTmTS,EntryDtTmTS,DispatchDtTmTS,ResponseDtTmTS,OnSceneDtTmTS,TransportDtTmTS,HospitalDtTmTS,AvailableDtTmTS,IncidentNumber.1,Exposure Number,ID,Address.1,Incident Date,Call Number,Alarm DtTm,Arrival DtTm,Close DtTm,City.1,zipcode,Battalion.1,Station Area,Box.1,Suppression Units,Suppression Personnel,EMS Units,EMS Personnel,Other Units,Other Personnel,First Unit On Scene,Estimated Property Loss,Estimated Contents Loss,Fire Fatalities,Fire Injuries,Civilian Fatalities,Civilian Injuries,Number of Alarms,Primary Situation,Mutual Aid,Action Taken Primary,Action Taken Secondary,Action Taken Other,Detector Alerted Occupants,Property Use,Area of Fire Origin,Ignition Cause,Ignition Factor Primary,Ignition Factor Secondary,Heat Source,Item First Ignited,Human Factors Associated with Ignition,Structure Type,Structure Status,Floor of Fire Origin,Fire Spread,No Flame Spead,Number of floors with minimum damage,Number of floors with significant damage,Number of floors with heavy damage,Number of floors with extreme damage,Detectors Present,Detector Type,Detector Operation,Detector Effectiveness,Detector Failure Reason,Automatic Extinguishing System Present,Automatic Extinguishing Sytem Type,Automatic Extinguishing Sytem Perfomance,Automatic Extinguishing Sytem Failure Reason,Number of Sprinkler Heads Operating,Supervisor District,neighborhood_district,point
160011002,E36,16000187,Alarms,01/01/2016 04:39:24 AM,Fire,1600 Block of MARKET ST,San Francisco,94103,B02,36,5115,3,3,3,True,Alarm,1,ENGINE,1,2,6,Mission,160011002-E36,POINT (-122.422188261426 37.772694823686),2016-01-01T00:00:00.000+0000,2015-12-31T00:00:00.000+0000,2016-01-01T04:31:35.000+0000,2016-01-01T04:33:09.000+0000,2016-01-01T04:34:32.000+0000,2016-01-01T04:37:27.000+0000,2016-01-01T04:39:24.000+0000,,,2016-01-01T04:43:30.000+0000,16000187,0,160001870,1693 MARKET STREET,2016-01-01T00:00:00.000+0000,160011002,2016-01-01T04:31:35.000+0000,2016-01-01T04:39:24.000+0000,2016-01-01T04:44:10.000+0000,San Francisco,94103,B02,36,5115,3,11,0,0,0,0,,,,0,0,0,0,1,"714 Central station, malicious false alarm",N None,86 Investigate,63 Restore fire alarm system,,,"439 Boarding/rooming house, residential hotels",,,,,,,,,,,,,,,,,,,,,,,,,,,,Mission,POINT (-122.421932 37.772551)
160011002,B02,16000187,Alarms,01/01/2016 04:39:37 AM,Fire,1600 Block of MARKET ST,San Francisco,94103,B02,36,5115,3,3,3,False,Alarm,1,CHIEF,2,2,6,Mission,160011002-B02,POINT (-122.422188261426 37.772694823686),2016-01-01T00:00:00.000+0000,2015-12-31T00:00:00.000+0000,2016-01-01T04:31:35.000+0000,2016-01-01T04:33:09.000+0000,2016-01-01T04:34:32.000+0000,2016-01-01T04:37:28.000+0000,2016-01-01T04:39:37.000+0000,,,2016-01-01T04:44:10.000+0000,16000187,0,160001870,1693 MARKET STREET,2016-01-01T00:00:00.000+0000,160011002,2016-01-01T04:31:35.000+0000,2016-01-01T04:39:24.000+0000,2016-01-01T04:44:10.000+0000,San Francisco,94103,B02,36,5115,3,11,0,0,0,0,,,,0,0,0,0,1,"714 Central station, malicious false alarm",N None,86 Investigate,63 Restore fire alarm system,,,"439 Boarding/rooming house, residential hotels",,,,,,,,,,,,,,,,,,,,,,,,,,,,Mission,POINT (-122.421932 37.772551)
160011002,T06,16000187,Alarms,,Fire,1600 Block of MARKET ST,San Francisco,94103,B02,36,5115,3,3,3,False,Alarm,1,TRUCK,3,2,6,Mission,160011002-T06,POINT (-122.422188261426 37.772694823686),2016-01-01T00:00:00.000+0000,2015-12-31T00:00:00.000+0000,2016-01-01T04:31:35.000+0000,2016-01-01T04:33:09.000+0000,2016-01-01T04:34:32.000+0000,2016-01-01T04:36:58.000+0000,,,,2016-01-01T04:40:04.000+0000,16000187,0,160001870,1693 MARKET STREET,2016-01-01T00:00:00.000+0000,160011002,2016-01-01T04:31:35.000+0000,2016-01-01T04:39:24.000+0000,2016-01-01T04:44:10.000+0000,San Francisco,94103,B02,36,5115,3,11,0,0,0,0,,,,0,0,0,0,1,"714 Central station, malicious false alarm",N None,86 Investigate,63 Restore fire alarm system,,,"439 Boarding/rooming house, residential hotels",,,,,,,,,,,,,,,,,,,,,,,,,,,,Mission,POINT (-122.421932 37.772551)


In [0]:
#Note that the joined DF is only 1.1 million rows b/c we did an inner join ( the orginal Fire Service calls data had 4+ million rows)
joinedDF.count()

In [0]:
joinedDF.filter(year('CallDateTS')=='2016').filter(col('NeighborhooodsAnalysisBoundaries')=='Tenderloin').count()

In [0]:
display(joinedDF.filter(year('CallDateTS')=='2016').filter(col('NeighborhooodsAnalysisBoundaries')=='Tenderloin').groupBy('Primary Situation').count().orderBy(desc("count")).limit(10))

Primary Situation,count
"700 False alarm or false call, other",1095
"743 Smoke detector activation, no fire - unintentional",784
"113 Cooking fire, confined to container",678
"711 Municipal alarm system, malicious false alarm",669
735 Alarm system sounded due to malfunction,508
"745 Alarm system activation, no fire - unintentional",488
"311 Medical assist, assist EMS crew",322
111 Building fire,263
"500 Service Call, other",232
554 Assist invalid,204


%md

### ![Spark Logo](https://upload.wikimedia.org/wikipedia/commons/e/ea/Spark-logo-192x100px.png) Convert a spark Dataframe to a Pandas Dataframe

In [0]:
import pandas as pd

In [0]:
pandas2016DF = joinedDF.filter(year('CallDateTS')=='2016').toPandas()

In [0]:
pandas2016DF.dtypes

In [0]:
pandas2016DF.describe()

Unnamed: 0,CallNumber,IncidentNumber,ZipcodeofIncident,FinalPriority,NumberofAlarms,Unitsequenceincalldispatch,IncidentNumber.1,Exposure Number,ID,Call Number,Suppression Units,Suppression Personnel,EMS Units,EMS Personnel,Other Units,Other Personnel,Estimated Property Loss,Estimated Contents Loss,Fire Fatalities,Fire Injuries,Civilian Fatalities,Civilian Injuries,Number of Alarms,Floor of Fire Origin,Number of floors with minimum damage,Number of floors with significant damage,Number of floors with heavy damage,Number of floors with extreme damage,Number of Sprinkler Heads Operating,Supervisor District
count,92475.0,92475.0,92178.0,92475.0,92475.0,92475.0,92475.0,92475.0,92475.0,92475.0,92475.0,92475.0,92475.0,92475.0,92475.0,92475.0,7385.0,7599.0,92475.0,92475.0,92475.0,92475.0,92475.0,3919.0,45.0,25.0,19.0,0.0,320.0,46393.0
mean,161865600.0,16073990.0,94114.572002,2.94538,1.011344,3.022255,16073990.0,0.000314,160739900.0,161865600.0,4.016696,14.640487,0.583596,1.033101,0.25435,0.471976,85300.1,24661.125411,0.0,0.002314,0.000151,0.006056,1.010803,1.979076,1.0,1.0,1.0,,1.315625,6.124868
std,1062090.0,42338.7,10.691538,0.227239,0.166809,3.229729,42338.7,0.017706,423387.0,1062090.0,3.549008,12.958278,1.007211,1.728957,0.528779,0.959708,357071.7,118148.229881,0.0,0.094482,0.012303,0.128148,0.165216,1.441587,0.0,0.0,0.0,,0.563024,2.928767
min,160010000.0,16000000.0,94102.0,2.0,1.0,1.0,16000000.0,0.0,160000000.0,160010000.0,0.0,0.0,0.0,0.0,0.0,0.0,-10.0,-5000.0,0.0,0.0,0.0,0.0,1.0,-1.0,1.0,1.0,1.0,,1.0,1.0
25%,160942300.0,16037310.0,94107.0,3.0,1.0,1.0,16037310.0,0.0,160373100.0,160942300.0,2.0,9.0,0.0,0.0,0.0,0.0,50.0,20.0,0.0,0.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,,1.0,3.0
50%,161892400.0,16074740.0,94112.0,3.0,1.0,2.0,16074740.0,0.0,160747400.0,161892400.0,3.0,10.0,0.0,0.0,0.0,0.0,1000.0,100.0,0.0,0.0,0.0,0.0,1.0,2.0,1.0,1.0,1.0,,1.0,6.0
75%,162791600.0,16110670.0,94122.0,3.0,1.0,3.0,16110670.0,0.0,161106700.0,162791600.0,4.0,16.0,1.0,2.0,0.0,0.0,10000.0,1200.0,0.0,0.0,0.0,0.0,1.0,3.0,1.0,1.0,1.0,,2.0,9.0
max,163664000.0,16146990.0,94158.0,3.0,5.0,83.0,16146990.0,1.0,161469900.0,163664000.0,61.0,234.0,13.0,23.0,5.0,15.0,3000000.0,1000000.0,0.0,4.0,1.0,4.0,5.0,10.0,1.0,1.0,1.0,,3.0,11.0
