In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [3]:
# path of the csv file containing the data
fdc_csv_path = "D:/datasets/FireDepartmentCalls/Fire_Department_Calls_for_Service.csv"

# folder in which to save the cleaned tables
# the PowerBI report will read these tables
base_dir = "D:/projects/FDC_PBI_Dashboard/tables"

In [4]:
# create the spark session object
spark = SparkSession.builder.master("local[6]").getOrCreate()

In [5]:
# schema of the csv file 
schema = StructType([
    StructField("Call Number", StringType(), nullable=False),
    StructField("Unit ID", StringType(), nullable=True),
    StructField("Incident Number", StringType(), nullable=True),
    StructField("Call Type", StringType(), nullable=True),
    StructField("Call Date", StringType(), nullable=True),
    StructField("Watch Date", StringType(), nullable=True),
    StructField("Received DtTm", StringType(), nullable=True),
    StructField("Entry DtTm", StringType(), nullable=True),
    StructField("Dispatch DtTm", StringType(), nullable=True),
    StructField("Response DtTm", StringType(), nullable=True),
    StructField("On Scene DtTm", StringType(), nullable=True),
    StructField("Transport DtTm", StringType(), nullable=True),
    StructField("Hospital DtTm", StringType(), nullable=True),
    StructField("Call Final Disposition", StringType(), nullable=True),
    StructField("Available DtTm", StringType(), nullable=True),
    StructField("Address", StringType(), nullable=True),
    StructField("City", StringType(), nullable=True),
    StructField("Zipcode of Incident", StringType(), nullable=True),
    StructField("Battalion", StringType(), nullable=True),
    StructField("Station Area", StringType(), nullable=True),
    StructField("Box", StringType(), nullable=True),
    StructField("Original Priority", StringType(), nullable=True),
    StructField("Priority", StringType(), nullable=True),
    StructField("Final Priority", StringType(), nullable=True),
    StructField("ALS Unit", BooleanType(), nullable=True),
    StructField("Call Type Group", StringType(), nullable=True),
    StructField("Number of Alarms", StringType(), nullable=True),
    StructField("Unit Type", StringType(), nullable=True),
    StructField("Unit sequence in call dispatch", StringType(), nullable=True),
    StructField("Fire Prevention District", StringType(), nullable=True),
    StructField("Supervisor District", StringType(), nullable=True),
    StructField("Neighborhooods - Analysis Boundaries", StringType(), nullable=True),
    StructField("RowID", StringType(), nullable=True),
    StructField("case_location", StringType(), nullable=True),
    StructField("Analysis Neighborhoods", StringType(), nullable=True),
])

In [6]:
# load the csv file and rename columns
fdc_df = (
    spark.
    read.
    format("csv").
    option("header", "true").
    schema(schema).
    load(fdc_csv_path).
    withColumnRenamed("Neighborhooods - Analysis Boundaries", "Neighborhooods Analysis Boundaries").
    withColumnRenamed("case_location", "Case Location")
)

fdc_df.printSchema()

root
 |-- Call Number: string (nullable = true)
 |-- Unit ID: string (nullable = true)
 |-- Incident Number: string (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- Call Date: string (nullable = true)
 |-- Watch Date: string (nullable = true)
 |-- Received DtTm: string (nullable = true)
 |-- Entry DtTm: string (nullable = true)
 |-- Dispatch DtTm: string (nullable = true)
 |-- Response DtTm: string (nullable = true)
 |-- On Scene DtTm: string (nullable = true)
 |-- Transport DtTm: string (nullable = true)
 |-- Hospital DtTm: string (nullable = true)
 |-- Call Final Disposition: string (nullable = true)
 |-- Available DtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode of Incident: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Original Priority: string (nullable = true)
 |-- Priority: string (nullable = 

In [7]:
# Fire Prevention District is of integer type, but some values are stored as doubles (6.0, 70, etc.)
# cast the column to double, then to long, and finally to string to replace null values with Unknown

fdc_with_parsed_FPD = (
    fdc_df.
     withColumn(
        "Fire Prevention District", 
        f.col("Fire Prevention District").cast("double").cast("long").cast("string")
    )
)

In [8]:
# rename all columns: eliminate spaces and capitalize the first letter of each word

def format_column(name):
        capitalized_words = [x[:1].upper() + x[1:]for x in name.split(" ")]
        return "".join(capitalized_words)

def rename_columns(df, g):
    new_colums = [f.col(col).alias(g(col)) for col in df.columns]
    return df.select(*new_colums)

fdc_with_columns_renamed = rename_columns(fdc_with_parsed_FPD, format_column)

fdc_with_columns_renamed.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- ReceivedDtTm: string (nullable = true)
 |-- EntryDtTm: string (nullable = true)
 |-- DispatchDtTm: string (nullable = true)
 |-- ResponseDtTm: string (nullable = true)
 |-- OnSceneDtTm: string (nullable = true)
 |-- TransportDtTm: string (nullable = true)
 |-- HospitalDtTm: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeOfIncident: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPrior

In [9]:
# replace null values with Unknown

fdc_with_nulls_replaced = (
    fdc_with_columns_renamed.
    na.fill(
        "Unknown", 
        ["CallTypeGroup", "FirePreventionDistrict", "StationArea"]
    )
)

In [10]:
# tranform columns ending in Date to Date format
# tranform columns ending in DtTm to Timestamp format

def date_cols_to_date(df, date_format):
    result_columns = []
    for col in df.columns:
        if col.endswith("Date"):
            result_columns.append(f.to_date(col, date_format).alias(col))
        else:
            result_columns.append(col)
            
    return df.select(*result_columns)

def dttm_cols_to_timestamp(df, timestamp_format):
    result_columns = []
    for col in df.columns:
        if col.endswith("DtTm"):
            result_columns.append(f.to_timestamp(col, timestamp_format).alias(col))
        else:
            result_columns.append(col)
            
    return df.select(*result_columns)

fdc_with_dates = date_cols_to_date(fdc_with_nulls_replaced, "MM/dd/yyyy")
fdc_with_timestamps = dttm_cols_to_timestamp(fdc_with_dates, "MM/dd/yyyy hh:mm:ss a")

fdc_with_timestamps.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: date (nullable = true)
 |-- WatchDate: date (nullable = true)
 |-- ReceivedDtTm: timestamp (nullable = true)
 |-- EntryDtTm: timestamp (nullable = true)
 |-- DispatchDtTm: timestamp (nullable = true)
 |-- ResponseDtTm: timestamp (nullable = true)
 |-- OnSceneDtTm: timestamp (nullable = true)
 |-- TransportDtTm: timestamp (nullable = true)
 |-- HospitalDtTm: timestamp (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: timestamp (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeOfIncident: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = false)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = 

In [11]:
# parse the CaseLocation column and store it as a Map[String, Double]

fdc_with_modified_case = (
    fdc_with_timestamps.    
    withColumn("CaseLocation", f.expr("regexp_extract(CaseLocation, 'POINT .(.*).', 1)")).
    withColumn("CaseLocation", f.expr("split(CaseLocation, ' ')")).
    withColumn(
        "CaseLocation", 
        f.create_map(
            f.lit("Longitude"), f.expr("CaseLocation[0]").cast("Double"),
            f.lit("Latitude"), f.expr("CaseLocation[1]").cast("Double")
        )
    )
)

fdc_with_modified_case.printSchema()

root
 |-- CallNumber: string (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: string (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: date (nullable = true)
 |-- WatchDate: date (nullable = true)
 |-- ReceivedDtTm: timestamp (nullable = true)
 |-- EntryDtTm: timestamp (nullable = true)
 |-- DispatchDtTm: timestamp (nullable = true)
 |-- ResponseDtTm: timestamp (nullable = true)
 |-- OnSceneDtTm: timestamp (nullable = true)
 |-- TransportDtTm: timestamp (nullable = true)
 |-- HospitalDtTm: timestamp (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: timestamp (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeOfIncident: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = false)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = 

In [12]:
# store the dataset is parquet format to avoid parsing text multiple times

# input dataset size is 2.1GB aprox, keep the tasks input data to 128MB approximately
spark.conf.set("spark.sql.shuffle.partitions", 18)

(
    fdc_with_modified_case.
    write.
    format("parquet").
    mode("overwrite").
    save(f"{base_dir}/FDC_parquet")
)

In [13]:
# load the previously saved dataset

# reduce the number of partitions to the number of cores
spark.conf.set("spark.sql.shuffle.partitions", 6)

fdc_df = (
    spark.
    read.
    format("parquet").
    load(f"{base_dir}/FDC_parquet")
)

In [14]:
def get_functional_dependency_offending_keys(df, keys, attributes):
    """
    Returns the 'keys' columns in 'df' for which the 'attributes' columns
    are not functionally dependent on the 'keys' columns. This is a heavy 
    computation so the results should be collected or cached if they are 
    used multiple times.
    """
    offending_keys = (
        df.
        select(*keys, *attributes).
        distinct().
        groupby(*keys).
        agg(f.count("*").alias("n_rows")).
        filter("n_rows > 1"). 
        select(*keys)
    )
    
    return offending_keys

In [15]:
def reduce_left(iterator, reducer, zero):
    """
    Reduce the iterator using a zero value and a reducer binary operation
    """
    result = zero
    for x in iterator:
        result = reducer(result, x)
    return result

In [16]:
def for_all(columns):
    """
    Return an expression that returns True if all the columns in 'columns'
    are True. If columns is empty, return True.
    """
    return reduce_left(columns, lambda x, y: x & y, f.lit(True))

In [17]:
def filter_rows_that_violate_rules(df, rules):
    """
    Given a list of rules, keep all the rows in df for which all the 
    rules are satisfied.
    """
    rule_exprs = [ f.expr(rule) for rule in rules ]
    temp_df = df.filter(for_all(rule_exprs))
    return temp_df

In [19]:
# set the unit key and and attribute columns
unit_key = ["UnitID"]
unit_attributes = ["UnitType"]

# get the offending keys 
units_offending_keys = get_functional_dependency_offending_keys(
    fdc_df, 
    unit_key, 
    unit_key
)

units_offending_keys_list = [
    row["UnitID"] for row in units_offending_keys.collect()
]

print(f"found {len(units_offending_keys_list)} offending keys")

# drop data for the offending keys and deduplicate the data
units_df = (
    fdc_df.
    select(*unit_key, *unit_attributes).
    filter(~f.col("UnitID").isin(units_offending_keys_list)).
    distinct()
)

# store the results 
(
    units_df.
    coalesce(1).
    write.
    format("parquet").
    mode("overwrite").
    save(f"{base_dir}/units")
)

found 0 offending keys


In [20]:
# set the call key and and attribute columns
call_key = ["CallNumber"]

call_attributes = [
  "ReceivedDtTm",
  "CallType", "CallTypeGroup",
  "Priority",
  "Battalion",  "StationArea", "FirePreventionDistrict", 
  "AnalysisNeighborhoods", "SupervisorDistrict",
  "ZipCodeOfIncident", "Address", # "City" (city names are not standardized)
  "Box", "CaseLocation.Latitude", "CaseLocation.Longitude"
]

# get the offending keys
call_offending_keys = get_functional_dependency_offending_keys(
    fdc_df, 
    call_key, 
    call_attributes
)

call_offending_keys_list = [
    row["CallNumber"] for row in call_offending_keys.collect()
]

print(f"found {len(call_offending_keys_list)} offending keys")

# drop data for the offending keys and deduplicate the data
calls_df = (
    fdc_df.
    select(*call_key, *call_attributes).
    filter(~f.col("CallNumber").isin(call_offending_keys_list)).
    distinct()
)

# store the results 
(
    calls_df.
    write.
    format("parquet").
    mode("overwrite").
    save(f"{base_dir}/calls")
)

found 4 offending keys


In [21]:
# set the call unit key and and attribute columns
call_unit_key = ["CallNumber", "UnitID"]

call_unit_attributes = [
  "ALSUnit",
  "DispatchDtTm", "ResponseDtTm", "OnSceneDtTm",
  "TransportDtTm", "HospitalDtTm", "AvailableDtTm",
]

# in this case, there is no need to check for functional dependency as the 
# CallNumber and UnitID combinations are unique in the dataset

#select the call units columns
call_units_df = (
    fdc_df.
    select(*call_unit_key, *call_unit_attributes)
)

In [22]:
# drop the row of call_units_df without a parent 

call_units_no_orphans_df = (
    call_units_df.
    filter(~f.col("UnitID").isin(units_offending_keys_list)).
    filter(~f.col("CallNumber").isin(call_offending_keys_list))
)

In [23]:
# rules that need to be satisfied for each row in call_units_df

rules = [  
    # check that the DispatchDtTm and AvailableDtTm are both not null
    "DispatchDtTm IS NOT NULL AND AvailableDtTm IS NOT NULL",
    # check that DispatchDtTm is before AvailableDtTm
    "DispatchDtTm <= AvailableDtTm",
    
    # ResponseDtTm and OnSceneDtTm should be both null or not null
    "(ResponseDtTm IS NOT NULL) = (OnSceneDtTm IS NOT NULL)",
    # ResponseDtTm is not null, then DispatchDtTm <= ResponseDtTm <= OnSceneDtTm <= AvailableDtTm
    "IF(ResponseDtTm IS NOT NULL, (DispatchDtTm <= ResponseDtTm) AND (ResponseDtTm <= OnSceneDtTm) AND (OnSceneDtTm <= AvailableDtTm), TRUE)",
    
    # TransportDtTm and HospitalDtTm should be both null or not null
    "(TransportDtTm IS NOT NULL) = (HospitalDtTm IS NOT NULL)",
    # if TransportDtTm is not null, then OnSceneDtTm <= TransportDtTm <= HospitalDtTm <= AvailableDtTm
    "IF(TransportDtTm IS NOT NULL, (OnSceneDtTm <= TransportDtTm) AND (TransportDtTm <= HospitalDtTm) AND (HospitalDtTm <= AvailableDtTm), TRUE)"
]

# filter the rows in call_units_no_orphans_df that do not satisfy the rules
call_units_without_invalid_rows_df = filter_rows_that_violate_rules(
    call_units_no_orphans_df, 
    rules
)

In [24]:
# store the results 
(
    call_units_without_invalid_rows_df.
    write.
    format("parquet").
    mode("overwrite").
    save(f"{base_dir}/call_units")
)

In [25]:
# stop the spark session
spark.stop()