In [1]:
# nb_nfip_silver (CSV-compatible)
# Purpose: Read FEMA NFIP CSV (strings), standardize schema, and write Delta Silver tables.
# Notes:
# - The CSV schema differs from Parquet; UUID types are avoided.
# - We coalesce date formats and numeric fields defensively.

from pyspark.sql.functions import col, trim, upper, to_date, year, coalesce, lit
from pyspark.sql.types import DoubleType

bronze_path = "Files/bronze/fema_nfip_claims/"
silver_tbl  = "fema_nfip_claims_silver"

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 3, Finished, Available, Finished)

In [2]:
# --- 1) Read CSV as strings (header present) ---
df = (spark.read.option("header", True).csv(bronze_path))

df.head(2)

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 4, Finished, Available, Finished)

[Row(agricultureStructureIndicator='0', asOfDate='2025-11-17T00:00:00.000Z', basementEnclosureCrawlspaceType=None, policyCount='1', crsClassificationCode=None, dateOfLoss='1985-03-27T00:00:00.000Z', elevatedBuildingIndicator='0', elevationCertificateIndicator=None, elevationDifference=None, baseFloodElevation=None, ratedFloodZone='A07', houseWorship='0', locationOfContents='4', lowestAdjacentGrade=None, lowestFloorElevation=None, numberOfFloorsInTheInsuredBuilding='3', nonProfitIndicator='0', obstructionType='10', occupancyType='1', originalConstructionDate='1492-10-12T00:00:00.000Z', originalNBDate='1982-08-02T00:00:00.000Z', amountPaidOnBuildingClaim=None, amountPaidOnContentsClaim=None, amountPaidOnIncreasedCostOfComplianceClaim=None, postFIRMConstructionIndicator='0', rateMethod='1', smallBusinessIndicatorBuilding='0', totalBuildingInsuranceCoverage='185000', totalContentsInsuranceCoverage='60000', yearOfLoss='1985', primaryResidenceIndicator='0', buildingDamageAmount=None, buildin

In [3]:
df.describe

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 5, Finished, Available, Finished)

<bound method DataFrame.describe of DataFrame[agricultureStructureIndicator: string, asOfDate: string, basementEnclosureCrawlspaceType: string, policyCount: string, crsClassificationCode: string, dateOfLoss: string, elevatedBuildingIndicator: string, elevationCertificateIndicator: string, elevationDifference: string, baseFloodElevation: string, ratedFloodZone: string, houseWorship: string, locationOfContents: string, lowestAdjacentGrade: string, lowestFloorElevation: string, numberOfFloorsInTheInsuredBuilding: string, nonProfitIndicator: string, obstructionType: string, occupancyType: string, originalConstructionDate: string, originalNBDate: string, amountPaidOnBuildingClaim: string, amountPaidOnContentsClaim: string, amountPaidOnIncreasedCostOfComplianceClaim: string, postFIRMConstructionIndicator: string, rateMethod: string, smallBusinessIndicatorBuilding: string, totalBuildingInsuranceCoverage: string, totalContentsInsuranceCoverage: string, yearOfLoss: string, primaryResidenceIndic

In [4]:
df.describe().show()

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 6, Finished, Available, Finished)

+-------+-----------------------------+--------------------+-------------------------------+------------------+---------------------+--------------------+-------------------------+-----------------------------+-------------------+------------------+--------------+--------------------+------------------+-------------------+--------------------+----------------------------------+--------------------+------------------+------------------+------------------------+--------------------+-------------------------+-------------------------+------------------------------------------+-----------------------------+------------------+------------------------------+------------------------------+------------------------------+------------------+-------------------------+--------------------+----------------------+------------------------+---------------------+------------------+---------------------------+--------------------+----------------------+------------------------+---------------------+----

In [5]:
print("Total rows in the FEMA NFIP Claims V2 dataset: ", df.count())
print("Number of columns: ", len(df.columns))

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 7, Finished, Available, Finished)

Total rows in the FEMA NFIP Claims V2 dataset:  2718200
Number of columns:  73


In [7]:
# Helper: safe cast to double
def to_double(c):
    return col(c).cast(DoubleType()) if c in df.columns else lit(None).cast(DoubleType())

# Helper: safe check if column exists in dataframe
def has(c):
    return c in df.columns

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 9, Finished, Available, Finished)

In [8]:
# --- Clean up ISO 8601 timestamps like 2017-08-29T00:00:00.000Z ---
from pyspark.sql.functions import regexp_replace

df = (df
    .withColumn("dateOfLoss", regexp_replace(col("dateOfLoss"), "T.*Z", ""))   # remove time part
    .withColumn("asOfDate", regexp_replace(col("asOfDate"), "T.*Z", ""))       # same for asOfDate
)

# --- 2) Standardize & derive fields ---
# Dates: try multiple formats (yyyy-MM-dd, M/d/yyyy, etc.); fall back gracefully
loss_date = coalesce(
    to_date(col("dateOfLoss"), "yyyy-MM-dd"),
    to_date(col("dateOfLoss"), "M/d/yyyy"),
    to_date(col("dateOfLoss"), "MM/dd/yyyy")
)

report_date = coalesce(
    to_date(col("asOfDate"), "yyyy-MM-dd"),
    to_date(col("asOfDate"), "M/d/yyyy"),
    to_date(col("asOfDate"), "MM/dd/yyyy")
)

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 10, Finished, Available, Finished)

In [9]:
# Payments: prefer "amountPaidOn*" then fall back to "net*PaymentAmount"
paid_building = coalesce(
    to_double("amountPaidOnBuildingClaim"),
    to_double("netBuildingPaymentAmount"),
    lit(0.0).cast("double")
)

paid_contents = coalesce(
    to_double("amountPaidOnContentsClaim"),
    to_double("netContentsPaymentAmount"),
    lit(0.0).cast("double")
)

paid_icc = coalesce(
    to_double("netIccPaymentAmount"),
    to_double("amountPaidOnIncreasedCostOfComplianceClaim"),
    lit(0.0).cast("double")
)

total_paid = (paid_building + paid_contents + paid_icc).cast("double")

loss_year = coalesce(col("yearOfLoss").cast("int"), year(loss_date))

# Geo / identifiers
state = upper(trim(col("state"))) if has("state") else lit(None)
zip_ = trim(col("reportedZipCode")) if has("reportedZipCode") else trim(col("zipCode")) if has("zipCode") else lit(None)
county_code = trim(col("countyCode")) if has("countyCode") else lit(None)
census_tract = trim(col("censusTract")) if has("censusTract") else lit(None)
latitude = col("latitude").cast("double") if has("latitude") else lit(None).cast("double")
longitude = col("longitude").cast("double") if has("longitude") else lit(None).cast("double")
claim_id = trim(col("id")) if has("id") else lit(None)

standard = (df
    .withColumn("loss_date", loss_date)
    .withColumn("report_date", report_date)
    .withColumn("loss_year", loss_year)
    .withColumn("state", state)
    .withColumn("zip", zip_)
    .withColumn("county_code", county_code)
    .withColumn("censusTract", census_tract)
    .withColumn("latitude", latitude)
    .withColumn("longitude", longitude)
    .withColumn("paid_building", paid_building)
    .withColumn("paid_contents", paid_contents)
    .withColumn("paid_icc", paid_icc)
    .withColumn("total_paid", total_paid)
    .withColumn("claim_id", claim_id)
)

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 11, Finished, Available, Finished)

In [10]:
# --- 3) Trim to clean column set ---
cols = [
  "claim_id",
  "loss_date","report_date","loss_year",
  "state","zip","county_code","censusTract",
  "latitude","longitude",
  "paid_building","paid_contents","paid_icc","total_paid",
  # keep some useful raw columns if present
  "ratedFloodZone","floodEvent","floodZoneCurrent","eventDesignationNumber","occupancyType"
]
clean = standard.select([c for c in cols if c in standard.columns])

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 12, Finished, Available, Finished)

In [11]:
# --- 4) Write Delta Silver table (partitioned) ---
spark.sql(f"DROP TABLE IF EXISTS {silver_tbl}")
(clean
 .repartition("loss_year","state")
 .write
 .format("delta")
 .mode("overwrite")
 .partitionBy("loss_year","state")
 .saveAsTable(silver_tbl))

print("Skipping OPTIMIZE since table is partitioned.")

print("Silver table created:", silver_tbl)
print("Row count:", spark.table(silver_tbl).count())


StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 13, Finished, Available, Finished)

Skipping OPTIMIZE since table is partitioned.
Silver table created: fema_nfip_claims_silver
Row count: 2718200


In [12]:
print("Skipping OPTIMIZE since table is partitioned.")

print("Silver table created:", silver_tbl)
print("Row count:", spark.table(silver_tbl).count())

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 14, Finished, Available, Finished)

Skipping OPTIMIZE since table is partitioned.
Silver table created: fema_nfip_claims_silver
Row count: 2718200


In [13]:
# Read the table
df_silver = spark.table(silver_tbl)

# Display first 10 records
df_silver.show(10, truncate=False)

StatementMeta(, c0295cb6-5a21-48b2-abd8-e41602fd0376, 15, Finished, Available, Finished)

+------------------------------------+----------+-----------+---------+-----+-----+-----------+-----------+--------+---------+-------------+-------------+--------+----------+--------------+----------+----------------+----------------------+-------------+
|claim_id                            |loss_date |report_date|loss_year|state|zip  |county_code|censusTract|latitude|longitude|paid_building|paid_contents|paid_icc|total_paid|ratedFloodZone|floodEvent|floodZoneCurrent|eventDesignationNumber|occupancyType|
+------------------------------------+----------+-----------+---------+-----+-----+-----------+-----------+--------+---------+-------------+-------------+--------+----------+--------------+----------+----------------+----------------------+-------------+
|8abe3d49-1dbb-4695-9b8d-8ff80dc40682|1978-10-27|2025-11-17 |1978     |VI   |00804|78030      |NULL       |18.3    |-64.9    |1354.85      |0.0          |0.0     |1354.85   |NULL          |NULL      |NULL            |NULL              