In [1]:
from pyspark.sql import SparkSession as sp
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [2]:
spark = sp.builder.appName("SF_FIRE_CALLS").getOrCreate()

23/12/25 17:46:54 WARN Utils: Your hostname, Sivas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.59 instead (on interface en0)
23/12/25 17:46:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/25 17:46:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.format("csv").option("header", "true").option("inferScheme", "true").load("sf-fire-calls.csv")

In [4]:
df.count()

175296

In [5]:
df.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: string (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: string (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: string (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- Sup

In [6]:
df.select('CallType').where(F.col('CallType').isNotNull()).distinct().count()
# OR
# df.select('CallType').distinct().count()

                                                                                

30

In [7]:
# Convert The DateTime Format to Proper Time Format in spark.
fire_df = (
    df
    .withColumn("IncidentDate", F.to_timestamp(F.col("CallDate"),"MM/dd/yyyy")).drop('CallDate')
    .withColumn("OnWatchDate", F.to_timestamp(F.col("WatchDate"),"MM/dd/yyyy")).drop('WatchDate')
    .withColumn("AvailableDtTS", F.to_timestamp(F.col("AvailableDtTm"),"MM/dd/yyyy hh:mm:ss a")).drop('AvailableDtTm')
)

In [8]:
fire_df.show(10, truncate=False)

+----------+------+--------------+----------------+--------------------+---------------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+------------------------------+-------------------------------------+-------------+---------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|CallType        |CallFinalDisposition|Address                    |City|Zipcode|Battalion|StationArea|Box |OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|Neighborhood                  |Location                             |RowID        |Delay    |IncidentDate       |OnWatchDate        |AvailableDtTS      |
+----------+------+--------------+----------------+--------------------+---------------------------+----+-------

23/12/25 17:47:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [9]:
fire_df.select(F.year('IncidentDate')).distinct().orderBy(F.year('IncidentDate'), ascending=False).show()



+------------------+
|year(IncidentDate)|
+------------------+
|              2018|
|              2017|
|              2016|
|              2015|
|              2014|
|              2013|
|              2012|
|              2011|
|              2010|
|              2009|
|              2008|
|              2007|
|              2006|
|              2005|
|              2004|
|              2003|
|              2002|
|              2001|
|              2000|
+------------------+


                                                                                

In [10]:
# print(fire_df.select(F.year('IncidentDate')).show())
fire_df.select('CallType').distinct().where(F.year('IncidentDate') == 2006).show(truncate=False)

+----------------------+
|CallType              |
+----------------------+
|Administrative        |
|Watercraft in Distress|
+----------------------+


In [11]:
fire_df.select(F.year('IncidentDate')).distinct().orderBy(F.year('IncidentDate')).show()

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+


 # Read The CSV data in a new methodology, to validate previous approach of reading and scaning Data.
 # Define a new Schema.
 # Read the CSV alongside the newly defined schema.
 # Programmatic way to define a schema. 

In [12]:
# Schema
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),
                          StructField('UnitID', StringType(), True),
                          StructField('IncidentNumber', IntegerType(), True),
                          StructField('CallType', StringType(), True),
                          StructField('CallDate', StringType(), True),
                          StructField('WatchDate', StringType(), True),
                          StructField('CallFinalDisposition', StringType(), True),
                          StructField('AvailableDtTm', StringType(), True),
                          StructField('Address', StringType(), True),
                          StructField('City', StringType(), True),
                          StructField('Zipcode', IntegerType(), True),
                          StructField('Battalion', StringType(), True),
                          StructField('StationArea', StringType(), True),
                          StructField('Box', StringType(), True),
                          StructField('OriginalPriority', StringType(), True),
                          StructField('Priority', StringType(), True),
                          StructField('FinalPriority', IntegerType(), True),
                          StructField('ALSUnit', BooleanType(), True),
                          StructField('CallTypeGroup', StringType(), True),
                          StructField('NumAlarms', IntegerType(), True),
                          StructField('UnitType', StringType(), True),
                          StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                          StructField('FirePreventionDistrict', StringType(), True),
                          StructField('SupervisorDistrict', StringType(), True),
                          StructField('Neighborhood', StringType(), True),
                          StructField('Location', StringType(), True),
                          StructField('RowID', StringType(), True),
                          StructField('Delay', FloatType(), True)])


In [13]:
dfn = spark.read.csv('sf-fire-calls.csv', header=True,schema=fire_schema)

In [14]:
dfn.count()

175296

In [15]:
fire_dfn = (
    dfn
    .withColumn("IncidentDate", F.to_timestamp(F.col("CallDate"),"MM/dd/yyyy")).drop('CallDate')
    .withColumn("OnWatchDate", F.to_timestamp(F.col("WatchDate"),"MM/dd/yyyy")).drop('WatchDate')
    .withColumn("AvailableDtTS", F.to_timestamp(F.col("AvailableDtTm"),"MM/dd/yyyy hh:mm:ss a")).drop('AvailableDtTm')
)

In [None]:
fire_dfn.select(F.year('IncidentDate')).distinct().orderBy(F.year('IncidentDate'), ascending=True).show()

# What were all the different types of fire calls in 2018?

In [None]:
fire_dfn.select('CallType').where(F.year('IncidentDate') == 2018 ).distinct().show(truncate=False)

# What months within the year 2018 saw the highest number of fire calls?

In [None]:
(
     fire_dfn
     .where(F.year('IncidentDate') == 2018)
     .groupBy(F.month('IncidentDate')
     .alias('Incident_Month'))
     .max('IncidentNumber')
     .orderBy('max(IncidentNumber)', ascending=False)
     .show(5)
 )

In [None]:
#  Validation of above Result
(fire_dfn.select('IncidentNumber').where((F.year('IncidentDate') == 2018) & (F.month('IncidentDate') == 11)).distinct()
 .orderBy('IncidentNumber', ascending=False)
 .show(3,truncate=False))

# Which neighborhood in San Francisco generated the most fire calls in 2018?

In [None]:
(
    fire_dfn
    .where(F.year('IncidentDate') == 2018)
    .groupBy('Neighborhood')
    .max('IncidentNumber')
    .orderBy('max(IncidentNumber)',ascending=False)
    .show(10, truncate=False)
)

# Which neighborhoods had the worst response times to fire calls in 2018?

In [None]:
(
    fire_dfn
    .where(F.year('IncidentDate') == 2018)
    .groupBy('Neighborhood')
    .max('Delay')
    .orderBy('max(Delay)',ascending=False)
    .show(10,truncate=False)
)

# Which week in the year in 2018 had the most fire calls?

In [None]:
(
    fire_dfn
    .where(F.year('IncidentDate')==2017)
    .groupBy(F.weekofyear('IncidentDate'))
    .max('IncidentNumber')
    .orderBy('max(IncidentNumber)',ascending=False)
    .show(10, False)
)