In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Spark").getOrCreate()

25/06/24 09:49:35 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.1.74 instead (on interface wlp3s0)
25/06/24 09:49:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/24 09:49:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [4]:
schema = StructType(
    [
        StructField("CallNumber", IntegerType(), True),
        StructField("UnitID", StringType(), True),
        StructField("IncidentNumber", IntegerType(), True),
        StructField("CallType", StringType(), True),
        StructField("CallDate", StringType(), True),
        StructField("WatchDate", StringType(), True),
        StructField("CallFinalDisposition", StringType(), True),
        StructField("AvailableDtTm", StringType(), True),
        StructField("Address", StringType(), True),
        StructField("City", StringType(), True),
        StructField("Zipcode", IntegerType(), True),
        StructField("Battalion", StringType(), True),
        StructField("StationArea", IntegerType(), True),
        StructField("Box", IntegerType(), True),
        StructField("OriginalPriority", IntegerType(), True),
        StructField("Priority", IntegerType(), True),
        StructField("FinalPriority", IntegerType(), True),
        StructField("ALSUnit", BooleanType(), True),
        StructField("CallTypeGroup", StringType(), True),
        StructField("NumAlarms", IntegerType(), True),
        StructField("UnitType", StringType(), True),
        StructField("UnitSequenceInCallDispatch", IntegerType(), True),
        StructField("FirePreventionDistrict", IntegerType(), True),
        StructField("SupervisorDistrict", IntegerType(), True),
        StructField("Neighborhood", StringType(), True),
        StructField("Location", StringType(), True),
        StructField("RowID", StringType(), True),
        StructField("Delay", FloatType(), True),
    ]
)

In [5]:
sf_fc_df = spark.read.csv(path="sf-fire-calls.csv", schema=schema, header=True)

In [6]:
sf_fc_df.show(n=1, vertical=True)

25/06/24 09:49:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


-RECORD 0------------------------------------------
 CallNumber                 | 20110016             
 UnitID                     | T13                  
 IncidentNumber             | 2003235              
 CallType                   | Structure Fire       
 CallDate                   | 01/11/2002           
 WatchDate                  | 01/10/2002           
 CallFinalDisposition       | Other                
 AvailableDtTm              | 01/11/2002 01:51:... 
 Address                    | 2000 Block of CAL... 
 City                       | SF                   
 Zipcode                    | 94109                
 Battalion                  | B04                  
 StationArea                | 38                   
 Box                        | 3362                 
 OriginalPriority           | 3                    
 Priority                   | 3                    
 FinalPriority              | 3                    
 ALSUnit                    | false                
 CallTypeGro

In [7]:
sf_fc_df.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallDate',
 'WatchDate',
 'CallFinalDisposition',
 'AvailableDtTm',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'Delay']

In [8]:
sf_fc_df = (
    sf_fc_df
    .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
    .drop("CallDate")
    .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
    .drop("WatchDate")
    .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
    .drop("AvailableDtTm")
)

In [9]:
sf_fc_df.show(n=1, vertical=True)

-RECORD 0------------------------------------------
 CallNumber                 | 20110016             
 UnitID                     | T13                  
 IncidentNumber             | 2003235              
 CallType                   | Structure Fire       
 CallFinalDisposition       | Other                
 Address                    | 2000 Block of CAL... 
 City                       | SF                   
 Zipcode                    | 94109                
 Battalion                  | B04                  
 StationArea                | 38                   
 Box                        | 3362                 
 OriginalPriority           | 3                    
 Priority                   | 3                    
 FinalPriority              | 3                    
 ALSUnit                    | false                
 CallTypeGroup              | NULL                 
 NumAlarms                  | 1                    
 UnitType                   | TRUCK                
 UnitSequenc

# What were all the different types of fire calls in 2018?

In [10]:
(
    sf_fc_df
    .select("CallType")
    .where(year("IncidentDate") == 2018)
    .distinct()
    .show(truncate=False)
)



+-------------------------------+
|CallType                       |
+-------------------------------+
|Elevator / Escalator Rescue    |
|Alarms                         |
|Odor (Strange / Unknown)       |
|Citizen Assist / Service Call  |
|HazMat                         |
|Vehicle Fire                   |
|Other                          |
|Outside Fire                   |
|Traffic Collision              |
|Assist Police                  |
|Gas Leak (Natural and LP Gases)|
|Water Rescue                   |
|Electrical Hazard              |
|Structure Fire                 |
|Medical Incident               |
|Fuel Spill                     |
|Smoke Investigation (Outside)  |
|Train / Rail Incident          |
|Explosion                      |
|Suspicious Package             |
+-------------------------------+



                                                                                

# What months within the year 2018 saw the highest number of fire calls?

In [11]:
(
    sf_fc_df
    .select(month("IncidentDate").alias('month'))
    .where(year('IncidentDate') == 2018)
    .groupBy('month')
    .count()
    .orderBy(desc('count'))
    .show()
)

+-----+-----+
|month|count|
+-----+-----+
|   10| 1068|
|    5| 1047|
|    3| 1029|
|    8| 1021|
|    1| 1007|
|    6|  974|
|    7|  974|
|    9|  951|
|    4|  947|
|    2|  919|
|   11|  199|
+-----+-----+



# Which neighborhood in San Francisco generated the most fire calls in 2013?

In [36]:
(
    sf_fc_df
    .select("Neighborhood")
    .where((col("City")=="SF") & (year("IncidentDate") == 2013))
    .groupBy("Neighborhood")
    .count()
    .orderBy(desc("count"))
    .show(n=1)
)

+------------+-----+
|Neighborhood|count|
+------------+-----+
|  Tenderloin| 1385|
+------------+-----+
only showing top 1 row



# Which neighborhoods had the worst response times to fire calls in 2018?

In [37]:
(
    sf_fc_df
    .select("City", "Neighborhood", "Delay")
    .where(year("IncidentDate") == 2018)
    .groupBy("City", "Neighborhood")
    .agg(max("Delay").alias("ResponseTime"))
    .orderBy(desc("ResponseTime"))
    .show(n=100, truncate=False)
)

+-------------+------------------------------+------------+
|City         |Neighborhood                  |ResponseTime|
+-------------+------------------------------+------------+
|San Francisco|Chinatown                     |491.26666   |
|San Francisco|Financial District/South Beach|406.63333   |
|San Francisco|Tenderloin                    |340.48334   |
|San Francisco|Haight Ashbury                |175.86667   |
|San Francisco|Bayview Hunters Point         |155.8       |
|San Francisco|Pacific Heights               |129.01666   |
|San Francisco|Potrero Hill                  |109.8       |
|San Francisco|Inner Sunset                  |106.13333   |
|San Francisco|South of Market               |94.71667    |
|San Francisco|Inner Richmond                |90.433334   |
|San Francisco|Excelsior                     |83.76667    |
|San Francisco|Castro/Upper Market           |74.13333    |
|San Francisco|Western Addition              |67.916664   |
|San Francisco|Nob Hill                 

# Which week in the year in 2018 had the most fire calls?

In [34]:
(
    sf_fc_df
    .withColumn("Week", weekofyear(col("IncidentDate")))
    .select("Week")
    .where(year("IncidentDate") == 2018)
    .groupBy("Week")
    .count()
    .orderBy("Count", ascending=False)
    .show()
)

+----+-----+
|Week|count|
+----+-----+
|  22|  259|
|  40|  255|
|  43|  250|
|  25|  249|
|   1|  246|
|  44|  244|
|  13|  243|
|  32|  243|
|  11|  240|
|   5|  236|
|  18|  236|
|  23|  235|
|  31|  234|
|  42|  234|
|   2|  234|
|  19|  233|
|   8|  232|
|  34|  232|
|  10|  232|
|  28|  231|
+----+-----+
only showing top 20 rows



# Is there a correlation between neighborhood, zip code, and number of fire calls?

In [40]:
(
    sf_fc_df
    .select("ZipCode", "Neighborhood", "CallType")
    .groupBy("ZipCode", "Neighborhood")
    .agg(count("CallType").alias("count"))
    .orderBy("count", ascending=False)
    .show()
)

+-------+--------------------+-----+
|ZipCode|        Neighborhood|count|
+-------+--------------------+-----+
|  94102|          Tenderloin|17084|
|  94103|     South of Market|13762|
|  94110|             Mission|10444|
|  94124|Bayview Hunters P...| 9150|
|  94103|             Mission| 5445|
|  94109|          Tenderloin| 5377|
|  94105|Financial Distric...| 4235|
|  94121|      Outer Richmond| 4121|
|  94109|            Nob Hill| 3983|
|  94114| Castro/Upper Market| 3946|
|  94115|    Western Addition| 3934|
|  94133|         North Beach| 3706|
|  94122|     Sunset/Parkside| 3404|
|  94123|              Marina| 3360|
|  94112|           Excelsior| 3237|
|  94110|      Bernal Heights| 3109|
|  94116|     Sunset/Parkside| 3025|
|  94102|        Hayes Valley| 2814|
|  94132|           Lakeshore| 2658|
|  94112|       Outer Mission| 2608|
+-------+--------------------+-----+
only showing top 20 rows

