In [3]:
from pyspark.sql import SparkSession

# Создаем сессию Spark
spark = SparkSession.builder \
    .appName("MyApp") \
    .getOrCreate()

In [4]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

# Programmatic way to define a schema 
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                     StructField('UnitID', StringType(), True),
                     StructField('IncidentNumber', IntegerType(), True),
                     StructField('CallType', StringType(), True),                  
                     StructField('CallDate', StringType(), True),      
                     StructField('WatchDate', StringType(), True),
                     StructField('CallFinalDisposition', StringType(), True),
                     StructField('AvailableDtTm', StringType(), True),
                     StructField('Address', StringType(), True),       
                     StructField('City', StringType(), True),       
                     StructField('Zipcode', IntegerType(), True),       
                     StructField('Battalion', StringType(), True),                 
                     StructField('StationArea', StringType(), True),       
                     StructField('Box', StringType(), True),       
                     StructField('OriginalPriority', StringType(), True),       
                     StructField('Priority', StringType(), True),       
                     StructField('FinalPriority', IntegerType(), True),       
                     StructField('ALSUnit', BooleanType(), True),       
                     StructField('CallTypeGroup', StringType(), True),
                     StructField('NumAlarms', IntegerType(), True),
                     StructField('UnitType', StringType(), True),
                     StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                     StructField('FirePreventionDistrict', StringType(), True),
                     StructField('SupervisorDistrict', StringType(), True),
                     StructField('Neighborhood', StringType(), True),
                     StructField('Location', StringType(), True),
                     StructField('RowID', StringType(), True),
                     StructField('Delay', FloatType(), True)])

# Use the DataFrameReader interface to read a CSV file
sf_fire_file = "sf-fire-calls.csv"
fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

# Cache the DataFrame since we will be performing some operations on it.
fire_df.cache()
fire_df.count()

fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

In [10]:
#display(fire_df.limit(5))
fire_df.show(5, vertical=True)

-RECORD 0------------------------------------------
 CallNumber                 | 20110016             
 UnitID                     | T13                  
 IncidentNumber             | 2003235              
 CallType                   | Structure Fire       
 CallDate                   | 01/11/2002           
 WatchDate                  | 01/10/2002           
 CallFinalDisposition       | Other                
 AvailableDtTm              | 01/11/2002 01:51:... 
 Address                    | 2000 Block of CAL... 
 City                       | SF                   
 Zipcode                    | 94109                
 Battalion                  | B04                  
 StationArea                | 38                   
 Box                        | 3362                 
 OriginalPriority           | 3                    
 Priority                   | 3                    
 FinalPriority              | 3                    
 ALSUnit                    | false                
 CallTypeGro

In [12]:
# Filter out "Medical Incident" call types

# Note that filter() and where() methods on the DataFrame are similar. Check relevant documentation for their respective argument types.
few_fire_df = (fire_df
               .select("IncidentNumber", "AvailableDtTm", "CallType")
               .where(F.col("CallType") != "Medical Incident"))
few_fire_df.show(1, vertical=True)

-RECORD 0------------------------------
 IncidentNumber | 2003235              
 AvailableDtTm  | 01/11/2002 01:51:... 
 CallType       | Structure Fire       
only showing top 1 row



In [13]:
# return number of distinct types of calls using countDistinct()
from pyspark.sql.functions import *
(fire_df
  .select("CallType")
  .where(F.col("CallType").isNotNull())
  .agg(F.countDistinct("CallType").alias("DistinctCallTypes"))
  .show())

(fire_df
  .select("CallType")
  .where(F.col("CallType").isNotNull())
  .distinct()
  .count())

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



30

In [14]:
# Example
few_fire_df = (fire_df
  .select("IncidentNumber", "AvailableDtTm", "CallType") 
  .where(col("CallType") != "Medical Incident"))
few_fire_df.show(5, truncate=False, vertical=True)

-RECORD 0--------------------------------
 IncidentNumber | 2003235                
 AvailableDtTm  | 01/11/2002 01:51:44 AM 
 CallType       | Structure Fire         
-RECORD 1--------------------------------
 IncidentNumber | 2003250                
 AvailableDtTm  | 01/11/2002 04:16:46 AM 
 CallType       | Vehicle Fire           
-RECORD 2--------------------------------
 IncidentNumber | 2003259                
 AvailableDtTm  | 01/11/2002 06:01:58 AM 
 CallType       | Alarms                 
-RECORD 3--------------------------------
 IncidentNumber | 2003279                
 AvailableDtTm  | 01/11/2002 08:03:26 AM 
 CallType       | Structure Fire         
-RECORD 4--------------------------------
 IncidentNumber | 2003301                
 AvailableDtTm  | 01/11/2002 09:46:44 AM 
 CallType       | Alarms                 
only showing top 5 rows



In [17]:
# filter for only distinct non-null CallTypes from all the rows
(fire_df
  .select("CallType")
  .where(col("CallType").isNotNull())
  .distinct()
  .show(10, False))

+-----------------------------+
|CallType                     |
+-----------------------------+
|Elevator / Escalator Rescue  |
|Aircraft Emergency           |
|Alarms                       |
|Odor (Strange / Unknown)     |
|Citizen Assist / Service Call|
|HazMat                       |
|Explosion                    |
|Oil Spill                    |
|Vehicle Fire                 |
|Suspicious Package           |
+-----------------------------+
only showing top 10 rows



In [16]:
# Rename column
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(new_fire_df
  .select("ResponseDelayedinMins")
  .where(F.col("ResponseDelayedinMins") > 5)
  .show(5, False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



In [24]:
# date operations
fire_ts_df = (new_fire_df
  .withColumn("IncidentDate", F.to_timestamp(F.col("CallDate"), "MM/dd/yyyy"))
  .drop("CallDate") 
  .withColumn("OnWatchDate", F.to_timestamp(F.col("WatchDate"), "MM/dd/yyyy"))
  .drop("WatchDate") 
  .withColumn("AvailableDtTS", F.to_timestamp(F.col("AvailableDtTm"), 
  "MM/dd/yyyy hh:mm:ss a"))
  .drop("AvailableDtTm")
  # .show(5, truncate=False, vertical=True)
             )
# Select the converted columns
(fire_ts_df
  .select("IncidentDate", "OnWatchDate", "AvailableDtTS")
  .show(5, False))

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [25]:
# what were the most common types of fire calls?
(fire_ts_df
  .select("CallType")
  .where(F.col("CallType").isNotNull())
  .groupBy("CallType")
  .count()
  .orderBy("count", ascending=False)
  .show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [26]:
# min,max, avg
(fire_ts_df
  .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),
    F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins"))
  .show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



In [37]:
# Q-1) How many distinct types of calls were made to the Fire Department?
# To be sure, let's not count "null" strings in that column.
(fire_df
  .select("CallType")
  .where(F.col("CallType").isNotNull())
  .distinct()
  .count())
# few_fire_df.show(20, vertical=False)

30

In [43]:
# Q-2) What are distinct types of calls were made to the Fire Department?
# These are all the distinct type of call to the SF Fire Department
(fire_df
  .select("CallType")
  .where(
      (F.col("CallType").isNotNull())
     & (F.col("City") == "SF")
  )
  .distinct()
  .count())

30

In [53]:
# Q-3) Find out all response or delayed times greater than 5 mins?
# Rename the column Delay - > ReponseDelayedinMins
# Returns a new DataFrame
# Find out all calls where the response time to the fire site was delayed for more than 5 mins
# new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")
(fire_df
  .withColumn("OnWatchDate", F.col("Delay"))
  .select("Delay")
  .where(F.col("Delay") > 5)
  .count()
)

27508

In [54]:
# Q-4a) What zip codes accounted for most common calls?
# Let's investigate what zip codes in San Francisco accounted for most fire calls and what type where they.
# Filter out by CallType
# Group them by CallType and Zip code
# Count them and display them in descending order
# It seems like the most common calls were all related to Medical Incident, and the two zip codes are 94102 and 94103.
(fire_ts_df
  .select("Zipcode", "CallType")
  .where(F.col("Zipcode").isNotNull())
  .where(F.col("CallType").isNotNull())
  .groupBy("Zipcode", "CallType")
  .count()
  .orderBy("count", ascending=False)
  .show(n=10, truncate=False))

+-------+----------------+-----+
|Zipcode|CallType        |count|
+-------+----------------+-----+
|94102  |Medical Incident|16130|
|94103  |Medical Incident|14775|
|94110  |Medical Incident|9995 |
|94109  |Medical Incident|9479 |
|94124  |Medical Incident|5885 |
|94112  |Medical Incident|5630 |
|94115  |Medical Incident|4785 |
|94122  |Medical Incident|4323 |
|94107  |Medical Incident|4284 |
|94133  |Medical Incident|3977 |
+-------+----------------+-----+
only showing top 10 rows



In [57]:
# Q-4b) What San Francisco neighborhoods are in the zip codes 94102 and 94103
# Let's find out the neighborhoods associated with these two zip codes. In all likelihood, these are some of the contested neighborhood with high reported crimes.
(fire_ts_df
  .select("Neighborhood", "Zipcode")
  .where(F.col("Zipcode").isNotNull())
  .where((F.col("Zipcode")== 94102) | (F.col("Zipcode")== 94103))
  .count()
  # .show(n=10, truncate=False)
)

42737

In [59]:
# Q-5) What was the sum of all calls, average, min and max of the response times for calls?
# Let's use the built-in Spark SQL functions to compute the sum, avg, min, and max of few columns:
# Number of Total Alarms
# What were the min and max the delay in response time before the Fire Dept arrived at the scene of the call
(fire_df
  .select(F.sum("NumAlarms"), F.avg("Delay"),
    F.min("Delay"), F.max("Delay"))
  .show())

+--------------+-----------------+-----------+----------+
|sum(NumAlarms)|       avg(Delay)| min(Delay)|max(Delay)|
+--------------+-----------------+-----------+----------+
|        176170|3.892364154521585|0.016666668|   1844.55|
+--------------+-----------------+-----------+----------+



In [67]:
# ** Q-6b) What week of the year in 2018 had the most fire calls?**
# Note: Week 1 is the New Years' week and week 25 is the July 4 the week. Loads of fireworks, so it makes sense the higher number of calls.
# date operations
fire_ts_df = (new_fire_df
  .withColumn("IncidentDate", F.to_timestamp(F.col("CallDate"), "MM/dd/yyyy"))
  .drop("CallDate") 
  .withColumn("OnWatchDate", F.to_timestamp(F.col("WatchDate"), "MM/dd/yyyy"))
  .drop("WatchDate") 
  .withColumn("AvailableDtTS", F.to_timestamp(F.col("AvailableDtTm"), 
  "MM/dd/yyyy hh:mm:ss a"))
  .drop("AvailableDtTm")
  # .show(5, truncate=False, vertical=True)
             )
(fire_ts_df
 .select(F.weekofyear("IncidentDate"), F.year("IncidentDate"))
 .where(F.year("IncidentDate") == 2018)
 .groupBy("weekofyear(IncidentDate)", "year(IncidentDate)")
 .count()
 .orderBy("count", ascending=False)
 .show(5, truncate=False, vertical=True)
)

-RECORD 0------------------------
 weekofyear(IncidentDate) | 22   
 year(IncidentDate)       | 2018 
 count                    | 259  
-RECORD 1------------------------
 weekofyear(IncidentDate) | 40   
 year(IncidentDate)       | 2018 
 count                    | 255  
-RECORD 2------------------------
 weekofyear(IncidentDate) | 43   
 year(IncidentDate)       | 2018 
 count                    | 250  
-RECORD 3------------------------
 weekofyear(IncidentDate) | 25   
 year(IncidentDate)       | 2018 
 count                    | 249  
-RECORD 4------------------------
 weekofyear(IncidentDate) | 1    
 year(IncidentDate)       | 2018 
 count                    | 246  
only showing top 5 rows



In [71]:
# Доделать!
# ** Q-7) What neighborhoods in San Francisco had the worst response time in 2018?**
# It appears that if you living in Presidio Heights, the Fire Dept arrived in less than 3 mins, while Mission Bay took more than 6 mins.
(fire_df
 .withColumn("IncidentDate", F.to_timestamp(F.col("CallDate"), "MM/dd/yyyy"))
 .select(F.year("IncidentDate"), "City", "Neighborhood", F.avg("Delay"))
 .where(F.year("IncidentDate") == 2018)
 .groupBy("year(IncidentDate)", "City", "Neighborhood")
 .count()
 .orderBy("avg(Delay)", ascending=False)
 .show(5, truncate=False, vertical=True)
)

AnalysisException: [MISSING_GROUP_BY] The query does not include a GROUP BY clause. Add GROUP BY or turn it into the window functions using OVER clauses.;
Aggregate [year(cast(IncidentDate#36079 as date)) AS year(IncidentDate)#36111, City#9, Neighborhood#24, avg(Delay#27) AS avg(Delay)#36110]
+- Project [CallNumber#0, UnitID#1, IncidentNumber#2, CallType#3, CallDate#4, WatchDate#5, CallFinalDisposition#6, AvailableDtTm#7, Address#8, City#9, Zipcode#10, Battalion#11, StationArea#12, Box#13, OriginalPriority#14, Priority#15, FinalPriority#16, ALSUnit#17, CallTypeGroup#18, NumAlarms#19, UnitType#20, UnitSequenceInCallDispatch#21, FirePreventionDistrict#22, SupervisorDistrict#23, ... 5 more fields]
   +- Relation [CallNumber#0,UnitID#1,IncidentNumber#2,CallType#3,CallDate#4,WatchDate#5,CallFinalDisposition#6,AvailableDtTm#7,Address#8,City#9,Zipcode#10,Battalion#11,StationArea#12,Box#13,OriginalPriority#14,Priority#15,FinalPriority#16,ALSUnit#17,CallTypeGroup#18,NumAlarms#19,UnitType#20,UnitSequenceInCallDispatch#21,FirePreventionDistrict#22,SupervisorDistrict#23,... 4 more fields] csv
