In [0]:
%fs ls /databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv

# Libraries

In [0]:
import pandas,numpy

import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F

from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline


# Data 

In [0]:
sf_fire_path = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv"

In [0]:
fire_df = spark.read.csv(sf_fire_path, header=True, inferSchema=True)
fire_df.printSchema()
display(fire_df)


Databricks visualization. Run in Databricks to view.

In [0]:
inital_count= fire_df.count()

In [0]:
list(fire_df.columns)

# Transform Part

### 1. Temporal Processing:
Convert the date and time strings into proper timestamp objects to calculate the Delay or response times.

In [0]:

fire_df = fire_df \
    .withColumn("Call_Timestamp", to_timestamp(col("Call Date"), "MM/dd/yyyy hh:mm:ss a")) \
    .withColumn("Available_Timestamp", to_timestamp(col("Available DtTm"), "MM/dd/yyyy hh:mm:ss a")) \
    .withColumn("Response_Time_Sec", 
                unix_timestamp("Available_Timestamp") - unix_timestamp("Call_Timestamp"))
    

### 2. Geospatial Parsing
The Location column is a string. So we extract the coordinates for geospatial mapping

In [0]:
fire_df = fire_df \
    .withColumn("Loc_Clean", regexp_replace(col("Location"), "[\(\)]", "")) \
    .withColumn("Latitude", split(col("Loc_Clean"), ",").getItem(0).cast("double")) \
    .withColumn("Longitude", split(col("Loc_Clean"), ",").getItem(1).cast("double")) \
    .drop("Loc_Clean")

### 3. Data Cleaning and Feature Enggeneering:

In [0]:
# 1. Null values: Remove rows where critical identifiers are missing
# These columns are essential for tracking unique events and geospatial mapping
fire_df = fire_df.dropna(subset=["Incident Number", "Call Number", "Location"])

# Remove rows where Delay might be negative or null (data entry errors)
fire_df = fire_df.filter(col("Delay") >= 0)

print(f"Removed {inital_count - fire_df.count()} rows during cleaning.")

In [0]:
# 2. Feature Engg for the ML:
# Define the columns that need encoding
categorical_cols = ["Call Type Group", "UnitType", "Neighborhood"]

# a. StringIndexer: Converts strings to numerical indices (e.g., "Mission" -> 0, "SOMA" -> 1)
indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_Index", handleInvalid="keep") for c in categorical_cols]

# b. OneHotEncoder: Converts indices into binary vectors (e.g., [1, 0, 0])
encoders = [OneHotEncoder(inputCol=f"{c}_Index", outputCol=f"{c}_Vec") for c in categorical_cols]

# Apply as a pipeline to the cleaned dataframe
pipeline = Pipeline(stages=indexers + encoders)
ml_df = pipeline.fit(fire_df).transform(fire_df)

display(ml_df.select("Call Type Group", "Call Type Group_Vec", "UnitType", "UnitType_Vec"))

In [0]:
# 3. Geo-hashing:
# You can create a UDF to convert Latitude and Longitude into a single string token.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Example using a simple rounding logic as a 'proxy' for spatial grouping
ml_df = ml_df.withColumn("Geo_Grid_ID", 
    col("Latitude").cast("decimal(10,2)").cast("string") + "_" + 
    col("Longitude").cast("decimal(10,2)").cast("string"))

### EDA

In [0]:
total = fire_df.count()
target_col=  "Neighborhood"


neighborhood_stats = fire_df.groupBy(target_col) \
    .agg(F.count("*").alias("call_count")) \
    .withColumn("percentage", F.round((F.col("call_count") / total) * 100, 2)) \
    .orderBy(F.desc("call_count"))

print("Top 10 Neighborhoods by Volume:")
neighborhood_stats.show(10, truncate=False)

In [0]:
print(" Call type Analysis")
print(f"distinct call types -> {fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().count()}")
# Aggregate and sort by frequency
call_type_analysis = (fire_df
                      .select("CallType")
                      .where(col("CallType").isNotNull())
                      .groupBy("CallType")
                      .count()
                      .orderBy("count", ascending=False))

call_type_analysis.show(truncate=False)

In [0]:
top_10_calltypes = call_type_analysis.limit(10)
display(top_10_calltypes)

Databricks visualization. Run in Databricks to view.

In [0]:
# What zip codes accounted for most common calls?

common_zip_analysis = (fire_df
                      .select("CallType", "Zipcode of Incident")
                      .where(col("CallType").isNotNull())
                      .groupBy("CallType", "Zipcode of Incident")
                      .count()
                      .orderBy("count", ascending=False))
common_zip_analysis.show(truncate=False)
display(common_zip_analysis)

Databricks visualization. Run in Databricks to view.

In [0]:

display(fire_df.select("Neighborhood", "Zipcode of Incident").where((col("Zipcode of Incident") == 94102) | (col("Zipcode of Incident") == 94103)).distinct())