# Fire Incident Pipeline

In [1]:
from IPython.display import display, HTML
display(HTML("<style>pre{white-space: pre !important;}</style>"))

In [2]:
import findspark
import socket
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DateType

In [3]:
findspark.init()

In [4]:
spark = SparkSession.builder \
    .appName("Fire Incident") \
    .master("local[1]") \
    .config("spark.driver.host", "host.docker.internal") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "120s") \
    .config("spark.executor.instances", "1") \
    .config("spark.cores.max", "1") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "512m") \
    .config("spark.driver.memory", "512m") \
    .config('spark.executor.extraClassPath', '../jars/postgresql-42.7.3.jar') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/23 03:46:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
schema = StructType([
    StructField("Incident Number", IntegerType(), True),
    StructField("Exposure Number", IntegerType(), True),
    StructField("ID", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("Incident Date", DateType(), True),
    StructField("Call Number", IntegerType(), True),
    StructField("Alarm DtTm", TimestampType(), True),
    StructField("Arrival DtTm", TimestampType(), True),
    StructField("Close DtTm", TimestampType(), True),
    StructField("City", StringType(), True),
    StructField("zipcode", StringType(), True),
    StructField("Battalion", StringType(), True),
    StructField("Station Area", StringType(), True),
    StructField("Box", StringType(), True),
    StructField("Suppression Units", IntegerType(), True),
    StructField("Suppression Personnel", IntegerType(), True),
    StructField("EMS Units", IntegerType(), True),
    StructField("EMS Personnel", IntegerType(), True),
    StructField("Other Units", IntegerType(), True),
    StructField("Other Personnel", IntegerType(), True),
    StructField("First Unit On Scene", StringType(), True),
    StructField("Estimated Property Loss", IntegerType(), True),
    StructField("Estimated Contents Loss", IntegerType(), True),
    StructField("Fire Fatalities", IntegerType(), True),
    StructField("Fire Injuries", IntegerType(), True),
    StructField("Civilian Fatalities", IntegerType(), True),
    StructField("Civilian Injuries", IntegerType(), True),
    StructField("Number of Alarms", IntegerType(), True),
    StructField("Primary Situation", StringType(), True),
    StructField("Mutual Aid", StringType(), True),
    StructField("Action Taken Primary", StringType(), True),
    StructField("Action Taken Secondary", StringType(), True),
    StructField("Action Taken Other", StringType(), True),
    StructField("Detector Alerted Occupants", StringType(), True),
    StructField("Property Use", StringType(), True),
    StructField("Area of Fire Origin", StringType(), True),
    StructField("Ignition Cause", StringType(), True),
    StructField("Ignition Factor Primary", StringType(), True),
    StructField("Ignition Factor Secondary", StringType(), True),
    StructField("Heat Source", StringType(), True),
    StructField("Item First Ignited", StringType(), True),
    StructField("Human Factors Associated with Ignition", StringType(), True),
    StructField("Structure Type", StringType(), True),
    StructField("Structure Status", StringType(), True),
    StructField("Floor of Fire Origin", IntegerType(), True),
    StructField("Fire Spread", StringType(), True),
    StructField("No Flame Spread", StringType(), True),
    StructField("Number of floors with minimum damage", IntegerType(), True),
    StructField("Number of floors with significant damage", IntegerType(), True),
    StructField("Number of floors with heavy damage", IntegerType(), True),
    StructField("Number of floors with extreme damage", IntegerType(), True),
    StructField("Detectors Present", StringType(), True),
    StructField("Detector Type", StringType(), True),
    StructField("Detector Operation", StringType(), True),
    StructField("Detector Effectiveness", StringType(), True),
    StructField("Detector Failure Reason", StringType(), True),
    StructField("Automatic Extinguishing System Present", StringType(), True),
    StructField("Automatic Extinguishing Sytem Type", StringType(), True),
    StructField("Automatic Extinguishing Sytem Perfomance", StringType(), True),
    StructField("Automatic Extinguishing Sytem Failure Reason", StringType(), True),
    StructField("Number of Sprinkler Heads Operating", IntegerType(), True),
    StructField("Supervisor District", IntegerType(), True),
    StructField("neighborhood_district", StringType(), True),
    StructField("point", StringType(), True),
    StructField("data_as_of", TimestampType(), True),
    StructField("data_loaded_at", TimestampType(), True)
])

In [6]:
df = spark.read \
        .schema(schema) \
        .format('csv') \
        .option('header', 'True') \
        .option('dateFormat', 'yyyy/MM/dd') \
        .option('timestampFormat', 'yyyy/MM/dd hh:mm:ss a') \
        .load('../data/raw/Fire_Incidents_20240516.csv')
df.show(truncate=False)

24/07/23 03:46:50 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---------------+---------------+---------+--------------------------+-------------+-----------+-------------------+-------------------+-------------------+-------------+-------+---------+------------+----+-----------------+---------------------+---------+-------------+-----------+---------------+-------------------+-----------------------+-----------------------+---------------+-------------+-------------------+-----------------+----------------+----------------------------------------------------+----------+-------------------------------------------+----------------------+------------------+----------------------------+----------------------------------------------------+-------------------+--------------+-----------------------+-------------------------+-----------+------------------+--------------------------------------+--------------+----------------+--------------------+-----------+---------------+------------------------------------+----------------------------------------+---

## Rename Column

In [7]:
# Change all column to be lowercase and replacing space with underscore (_)
df = df.select([F.col(c).alias(c.lower().replace(' ', '_')) for c in df.columns])

In [8]:
# Rename misnamed columns
df = df.withColumnRenamed("automatic_extinguishing_sytem_type", "automatic_extinguishing_system_type") \
       .withColumnRenamed("automatic_extinguishing_sytem_failure_reason", "automatic_extinguishing_system_failure_reason") \
       .withColumnRenamed("automatic_extinguishing_sytem_perfomance", "automatic_extinguishing_system_performance")

In [9]:
df.show()

+---------------+---------------+---------+--------------------+-------------+-----------+-------------------+-------------------+-------------------+-------------+-------+---------+------------+----+-----------------+---------------------+---------+-------------+-----------+---------------+-------------------+-----------------------+-----------------------+---------------+-------------+-------------------+-----------------+----------------+--------------------+----------+--------------------+----------------------+------------------+--------------------------+--------------------+-------------------+--------------+-----------------------+-------------------------+-----------+------------------+--------------------------------------+--------------+----------------+--------------------+-----------+---------------+------------------------------------+----------------------------------------+----------------------------------+------------------------------------+-----------------+--------

## Inconsistent Labels

In [10]:
df = df.withColumn('zipcode', F.regexp_replace('zipcode', '\-\w+[0-9]{3,}', ''))

In [11]:
df = df.withColumn('primary_situation', F.regexp_replace('primary_situation', '[*-]', ''))
df = df.withColumn('primary_situation', F.trim('primary_situation'))

In [12]:
mutual_aid_mapping = {
    'Mutual aid given': '3 Mutual aid given',
    'Mutual aid received': '1 Mutual aid received',
    'Other aid given': '5 Other aid given',
    'Automatic or contract aid received': '2 Automatic aid received',
    'Automatic aid given': '4 Automatic aid given',
    '3 Mutual aid given': '3 Mutual aid given',
    'None' : 'N None'}
df = df.replace(mutual_aid_mapping, subset = ['mutual_aid'])

In [13]:
df = df.withColumn('ignition_factor_primary', F.regexp_replace('ignition_factor_primary', '-', ''))

In [14]:
ignition_factor_secondary_mapping = {
    '12 Heat source too close to combustibles.': '12 Heat source too close to combustibles.',
    '30 Electrical failure, malfunction, othe': '30 Electrical failure, malfunction, other',
    '11 Abandoned or discarded materials or p': '11 Abandoned or discarded materials or products',
    '32 Short circuit arc from mechanical dam': '32 Short-circuit arc from mechanical damage',
    '20 Mechanical failure, malfunction, othe': '20 Mechanical failure, malfunction, other',
    '18 Improper container or storage': '18 Improper container or storage procedure',
    '00 Factors contributing to ignition, other': '00 Other factor contributed to ignition',
    '33 Short cir. arc, defect/worn insulatio': '33 Short-circuit arc from defective, worn insulation',
    '52 Accidentally turned on, not turned of': '52 Accidentally turned on, not turned off',
    '57 Equipment used for not intended purpo': '57 Equipment not used for purpose intended',
    '13 Cuttin/welding too close to combustib': '13 Cutting/welding too close to combustibles',
    '73 Outside/open fire, debris/waste dispo': '73 Outside/open fire for debris or waste disposal',
    '74 Outside/open fire for warming or cook': '74 Outside/open fire for warming or cooking',
    '18 Improper container or storage procedure': '18 Improper container or storage',
}
df = df.withColumn('ignition_factor_secondary', F.regexp_replace('ignition_factor_secondary', '-', ''))
df = df.replace(ignition_factor_secondary_mapping, subset = ['ignition_factor_secondary'])


In [15]:
heat_source_mapping = {
    '11 Spark/ember/flame from operating equi': '11 Spark, ember, or flame from operating equipment',
    '12 Radiated/conducted heat operating equ': '12 Radiated or conducted heat from operating equipment',
    '13 Arcing': '13 Electrical arcing',
    '60 Heat; other open flame/smoking materi': '60 Heat from other open flame or smoking materials, other',
    '63 Heat from undetermined smoking materi': '63 Heat from undetermined smoking material',
    '65 Cigarette lighter': '65 Lighter: cigarette, cigar',
    '67 Warning or road flare; fusee': '67 Warning or road flare; fuse',
    '68 Backfire from internal combustion eng': '68 Backfire from internal combustion engine',
    '72 Chemical reaction': '72 Spontaneous combustion, chemical reaction',
    '97 Multiple heat sources including multi': '97 Multiple heat sources including multiple ignitions'
}
df = df.withColumn('heat_source', F.regexp_replace('heat_source', '-', ''))
df = df.replace(heat_source_mapping, subset = ['heat_source'])

In [16]:
item_first_ignited_mapping = {
    '96 Rubbish, trash, waste': '96 Rubbish, trash, or waste',
    '62 Flammable liquid/gas in/from engine or burner': '62 Flam. liq/gas-in/from engine or burne',
    '21 Upholstered sofa, chair, vehicle seats': '21 Upholstered sofa, chair, vehicle seat',
    '59 Rolled, wound material (paper and fabrics)': '59 Rolled, wound material (paper, fabric',
    '73 Heavy vegetation not crop, including trees': '73 Heavy vegetation no crops, inc. tre',
    '76 Cooking materials, including edible materials': '76 Cooking materials, inc. Edible materi',
    '00 Item first ignited, other': '00 Item First Ignited, Other',
    '14 Floor covering or rug/carpet/mat, surface': '14 Floor covering or rug/carpet/mat',
    '36 Curtain, blind, drapery, tapestry': '36 Curtains, blinds, drapery, tapestry',
    '11 Exterior roof covering, surface, finish': '11 Exterior roof covering or finish',
    '64 Flammable liquid/gas in container or pipe': '64 Flam liq/gas in container or pipe',
    '72 Light vegetation not crop, including grass': '72 Light vegetation no crops, inc. gra',
    '37 Goods not made up, including fabrics and yard goods': '37 Raw Goods, incl. fabrics and yarn',
    '66 Pipe, duct, conduit, hose': '66 Pipe, duct, conduit or hose',
    '61 Atomized liquid, vaporized liquid, aerosol.': '61 Atomized liq., vaporized liq.,aersol',
    '95 Film, residue, including paint & resi': '95 Film, residue, including paint and resin',
    '63 Flammable liquid/gas in/from final container': '63 Flam Liq/gas-in/from final container',
    '94 Dust, fiber, lint, including sawdust and excelsior': '94 Dust/fiber/lint. inc. sawdust, excels',
    '15 Interior wall covering excluding drapes, etc.': '15 Int. Wall cover  exclude drapes, etc.',
    '47 Tarpaulin, tent': '47 Tarpaulin or tent',
    '71 Agricultural crop, including fruits and vegetables': '71 Crop, incl. fruits and vegitables',
    '82 Transformer, including transformer fluids': '82 Transformer, including transformer fl',
    '18 Thermal, acoustical insulation within wall, partition or floor/ceiling space': '18 Insulation within structural area',
    '40 Adornment, recreational material, signs, other': '40 Adornment, recreational mat., signs,',
    '43 Sign, including outdoor signs such as billboards': '43 Sign, inc. outdoor sign/billboards',
    '74 Animal living or dead': '74 Animal, living or dead',
    '77 Feathers or fur, not on bird or anima': '77 Feathers or fur, not on bird or animal',
    '58 Palletized material, material stored on pallets.': '58 Palletized material',
    '54 Cord, rope, twine': '54 Cord, rope, twine, yarn'}

df = df.withColumn('item_first_ignited', F.regexp_replace('item_first_ignited', '-', ''))
df = df.replace(item_first_ignited_mapping, subset = ['item_first_ignited'])

In [17]:
df = df.withColumn('human_factors_associated_with_ignition', F.regexp_replace('human_factors_associated_with_ignition', '[Â§]', ''))

In [18]:
structure_type_mapping = {'4 Air-supported structure': '4 Air supported structure',
                          '7 Underground structure work area': '7 Underground structure work areas'}
df = df.withColumn('structure_type', F.regexp_replace('structure_type', '-', ''))
df = df.replace(structure_type_mapping, subset = ['structure_type'])

In [19]:
structure_status_mapping = {'0 Building status, other': '0 Other'}

df = df.withColumn('structure_type', F.regexp_replace('structure_type', '-', ''))
df = df.replace(structure_status_mapping, subset = ['structure_type'])

In [20]:
fire_spread_mapping = mapping = {
    '00 Item first ignited, other': '00 Item First Ignited, Other',
    '11 Exterior roof covering, surface, finish': '11 Exterior roof covering or finish',
    '66 Pipe, duct, conduit, hose': '66 Pipe, duct, conduit or hose',
    '15 Interior wall covering excluding drapes, etc.': '15 Int. Wall cover  exclude drapes, etc.',
    '76 Cooking materials, including edible materials': '76 Cooking materials, inc. Edible materia',
    '94 Dust/fiber/lint. inc. sawdust, excelsi': '94 Dust/fiber/lint. inc. sawdust, excelsi',
    '96 Rubbish, trash, or waste': '96 Rubbish, trash, waste',
    '61 Atomized liq., vaporized liq.,aersol': '61 Atomized liquid, vaporized liquid, aerosol.'
}
df = df.withColumn('fire_spread', F.regexp_replace('fire_spread', '-', ''))
df = df.replace(fire_spread_mapping, subset = ['fire_spread'])

In [21]:
no_flame_spread_mapping = {
    'NO': '0',
    'N': '0',
    'False': '0',
    'YES': '1',
    'Y': '1',
    'True': '1'
}
df = df.replace(no_flame_spread_mapping, subset = ['no_flame_spread'])

In [22]:
detectors_present_mapping = {
    'N None present': 'N Not present'
}
df = df.withColumn('detectors_present', F.regexp_replace('detectors_present', '-', ''))
df = df.replace(detectors_present_mapping, subset = ['detectors_present'])

In [23]:
detector_type_mapping = {
    '3 Combination smoke and heat in a single unit':'3 Combination smoke & heat in single unit'
}
df = df.withColumn('detector_type', F.regexp_replace('detector_type', '-', ''))
df = df.replace(detector_type_mapping, subset = ['detector_type'])

In [24]:
df = df.withColumn('detector_operation', F.regexp_replace('detector_operation', '-', ''))

In [25]:
detector_effectiveness_mapping = {'2 Alerted occupants-occ. failed to resond' : '2 Detector alerted occupants, occupants failed to respond',
                                   '4 Failed to alert occupants' : '4 Detector failed to alert occupants'}
df = df.withColumn('detector_effectiveness', F.regexp_replace('detector_effectiveness', '-', ''))
df = df.replace(detector_effectiveness_mapping, subset = ['detector_effectiveness'])

In [26]:
detector_failure_reason_mapping = {
    '0 -Detector failure reason, other': '0 Detector failure reason, other',
    '6 -Battery discharged or dead': '6 Battery discharged or dead',
    '5 -Battery missing or disconnected': '5 Battery missing or disconnected',
    '1 -Power fail/shutoff or disconnected dete': '1 Power failure, hardwired det. shut off, disconnect',
    '3 -Defective': '3 Defective',
    '4 -Lack of maintenance, inc. not cleaning': '4 Lack of maintenance, includes not cleaning',
    '2 -Improper installation or placement': '2 Improper installation or placement of detector'
}
df = df.replace(detector_failure_reason_mapping, subset = ['detector_failure_reason'])

In [27]:
df = df.withColumn('automatic_extinguishing_system_present', F.regexp_replace('automatic_extinguishing_system_present', '-', ''))

In [28]:
aes_type_mapping = {
    '1 Wet-pipe sprinkler' : '1 Wet-pipe sprinkler system',
    '6 Halogen-type system': 'Halogen type system'
}
df = df.withColumn('automatic_extinguishing_system_type', F.regexp_replace('automatic_extinguishing_system_type', '-', ''))
df = df.replace(aes_type_mapping, subset = ['automatic_extinguishing_system_type'])

In [29]:
df = df.withColumn('automatic_extinguishing_system_performance', F.regexp_replace('automatic_extinguishing_system_performance', '-', ''))

In [30]:
aes_failure_reason_mapping = {
    'Reason system not effective, other': '0 Reason system not effective, other',
    'System shut off' : '1 System shut off',
    'Not enough agent discharged to control the fire':'2 Not enough agent discharged to control the fire',
    'Agent discharged, but did not reach the fire': '3 Agent discharged, did not reach the fire',
    '3 Agent discharged, did not reach the fir': '3 Agent discharged, did not reach the fire',
    'Inappropriate system for the type of fire': '4 Inappropriate system for the type of fire',
    'Fire not in area protected by the system': '5 Fire not in area protected by the system',
}
df = df.withColumn('automatic_extinguishing_system_failure_reason', F.regexp_replace('automatic_extinguishing_system_failure_reason', '-', ''))
df = df.replace(aes_failure_reason_mapping, subset = ['automatic_extinguishing_system_failure_reason'])

## Duplicate Data

In [38]:
# Dropping duplicates row-wise
df = df.drop_duplicates() 

In [39]:
# Drop id column as it is the same with column incident_number
df = df.drop('id') 

## Feature Engineering

In [32]:
# Extract latitude and longitude using regex
df = df.withColumn('latitude', F.regexp_extract('point', '\(-?\d+\.\d+', 0))
df = df.withColumn('longitude', F.regexp_extract('point', '-?\d+\.\d+\)', 0))

# Removing open and closing brackets
df = df.withColumn('latitude', F.regexp_replace('latitude', '\(', ''))
df = df.withColumn('longitude', F.regexp_replace('longitude', '\)', ''))

# Casting latitude and longitude into float type
df = df.withColumn('latitude', F.col("latitude").cast("float"))
df = df.withColumn('longitude', F.col("longitude").cast("float"))

#Drop point column
df = df.drop('point')

In [34]:
df = df.withColumn('travel_time', (F.unix_timestamp(F.col("arrival_dttm")) - F.unix_timestamp(F.col("alarm_dttm")))/60) # In minutes
df = df.withColumn('travel_time', F.round(F.col('travel_time'), 2))

df = df.withColumn('suppression_time',(F.unix_timestamp(F.col("close_dttm")) - F.unix_timestamp(F.col("arrival_dttm")))/60) # In minutes
df = df.withColumn('supression_time', F.round(F.col('suppression_time'), 2))

In [40]:
df.show()

[Stage 13:>                                                         (0 + 1) / 1]

+---------------+---------------+--------------------+-------------+-----------+-------------------+-------------------+-------------------+----+-------+---------+------------+----+-----------------+---------------------+---------+-------------+-----------+---------------+-------------------+-----------------------+-----------------------+---------------+-------------+-------------------+-----------------+----------------+--------------------+----------+--------------------+----------------------+------------------+--------------------------+--------------------+--------------------+--------------------+-----------------------+-------------------------+----------------+--------------------+--------------------------------------+--------------+----------------+--------------------+-----------+---------------+------------------------------------+----------------------------------------+----------------------------------+------------------------------------+-----------------+-------------

                                                                                

## Load Data into PostgreSQL

In [41]:
postgres_ip = socket.gethostbyname('postgres-db')
mode = 'overwrite'
properties = {
    'user':'user',
    'password':'password',
    'driver': 'org.postgresql.Driver'
}

url = f'jdbc:postgresql://{postgres_ip}:5432/fire-incident-db'
df.write.jdbc(url = url, table = 'fire_incident', mode = mode, properties = properties)

                                                                                