In [0]:
# Set our working environment
# Catalog and schema names
catalog_name = "smart_claims_dev" 
# schema_name = "00_landing" 
# schema_name = "01_bronze"

# schema_name = "02_silver" 
schema_name = "03_gold" 

# Create the catalog if it does not exist
spark.sql(
    f"CREATE CATALOG IF NOT EXISTS {catalog_name}"
)

# Create the schema in the catalog
spark.sql(
    f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}"
)

In [0]:
import os
import pandas as pd


# Local data folder
data_dir = "data/sql_server/"

csv_files = {
    "claims": "claims.csv",
    "customers": "customers.csv", 
    "policies": "policies.csv"
}

# Pure Spark: Requires absolute paths, more complex file handling
# This hybrid: pandas handles file I/O easily, Spark handles Unity Catalog integration

# Load each CSV file
for table_name, file_name in csv_files.items():
    file_path = os.path.join(data_dir, file_name)
    
    # Read CSV and convert to Spark DataFrame in one line
    spark_df = spark.createDataFrame(pd.read_csv(file_path))
    spark_df.write.mode("overwrite").saveAsTable(f"{catalog_name}.01_bronze.{table_name}")

print("Tables created successfully")


In [0]:
# Define your source and destination
source_path = "data/training_imgs/"
destination_path = "/Volumes/smart_claims_dev/00_landing/training_imgs"

# Create the directory in the Volume if it doesn't exist
dbutils.fs.mkdirs(destination_path)

# Move the files
dbutils.fs.cp(source_path, destination_path, recurse=True)

# Verify the files are there
display(dbutils.fs.ls(destination_path))

In [0]:
metadata_ingest.py
import dlt

@dlt.table(
    name="claim_images_meta",
    comment="Raw accident claim images metadata ingested from S3", 
    table_properties={"quality": "bronze"}
)
def raw_images():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaEvolutionMode", "rescue")
        .load(f"/Volumes/smart_claims_dev/00_landing/claims/metadata"))

In [0]:
ingest_training_pcts
from pyspark.sql.functions import regexp_extract
import dlt


@dlt.table(
    name="smart_claims_dev.01_bronze.training_images",
    comment="Raw accident training image ingested from S3", 
    table_properties={"quality": "bronze"}
)
def raw_images():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "BINARYFILE")
        .load(f"/Volumes/smart_claims_dev/00_landing/training_imgs"))

In [0]:
import dlt

from pyspark.sql.functions import (
    col, to_date, date_format, trim, initcap,
    split, size, when, concat, lit, abs, to_timestamp, regexp_extract
)

catalog = "smart_claims_dev"
bronze_schema = "01_bronze"
silver_schema = "02_silver"

# --- CLEAN TELEMATICS ---
@dlt.table(
    name=f"{catalog}.{silver_schema}.telematics",
    comment="cleaned telematics events",
    table_properties={
        "quality": "silver"
    }
)
@dlt.expect("valid_coordinates", "latitude BETWEEN -90 AND 90 AND longitude BETWEEN -180 AND 180")
def telematics():
    return (
        dlt.readStream(f"{catalog}.{bronze_schema}.telematics")
        .withColumn("event_timestamp", to_timestamp(col("event_timestamp"), "yyyy-MM-dd HH:mm:ss"))
        .drop("_rescued_data")
    )

# --- CLEAN POLICY ---
@dlt.table(
    name=f"{catalog}.{silver_schema}.policy",
    comment="Cleaned policies",
    table_properties={
        "quality": "silver"
    }
)
@dlt.expect("valid_policy_no", "policy_no IS NOT NULL")
def policy():
    return (
        dlt.readStream(f"{catalog}.{bronze_schema}.policies")
        .withColumn("premium", abs("premium"))
        .select(
        col("POLICY_NO").alias("policy_no"),
        col("CUST_ID").alias("cust_id"))
        .drop("_rescued_data")
    )

# --- CLEAN CLAIM ---
@dlt.table(
    name=f"{catalog}.{silver_schema}.claim",
    comment="Cleaned claims",
    table_properties={
        "quality": "silver"
    }
)
@dlt.expect_all_or_drop({
    "valid_claim_number": "claim_no IS NOT NULL",
   # "valid_incident_hour": "incident_hour BETWEEN 0 AND 23"
})
def claim():
    df = dlt.readStream(f"{catalog}.{bronze_schema}.claims")
    return (
        df.withColumn("claim_date", to_date(col("claim_date")))
        # .withColumn("incident_date", to_date(col("incident_date"), "yyyy-MM-dd"))
        .withColumn("license_issue_date", to_date(col("license_issue_date"), "dd-MM-yyyy"))
        .drop("_rescued_data")
    )  

# # --- CLEAN CUSTOMER ---
@dlt.table(
    name=f"{catalog}.{silver_schema}.customer",
    comment="Cleaned customers (split names, minimal checks, drop name)",
    table_properties={
        "quality": "silver"
    }
)
@dlt.expect_all({
    "valid_customer_id": "customer_id IS NOT NULL"
})
def customer():
    df = dlt.readStream(f"{catalog}.{bronze_schema}.customers")

    name_normalized = when(
        size(split(trim(col("name")), ",")) == 2,
        concat(
            initcap(trim(split(col("name"), ",").getItem(1))), lit(" "),
            initcap(trim(split(col("name"), ",").getItem(0)))
        )
    ).otherwise(initcap(trim(col("name"))))

    return (
        df
        .withColumn("date_of_birth", to_date(col("date_of_birth"), "dd-MM-yyyy"))
        .withColumn("firstname", split(name_normalized, " ").getItem(0))
        .withColumn("lastname", split(name_normalized, " ").getItem(1))
        .withColumn("address", concat(col("BOROUGH"), lit(", "), col("ZIP_CODE")))
        .drop("name", "_rescued_data")
    )

# --- CLEAN TRAINING IMAGES ---
@dlt.table(
    name=f"{catalog}.{silver_schema}.training_images",
    comment="Enriched accident training images",
    table_properties={
        "quality": "silver"
    }
)
def training_images():
    df = dlt.readStream(f"{catalog}.{bronze_schema}.training_images")
    return df.withColumn(
        "label",
        regexp_extract("path", r"/(\d+)-([a-zA-Z]+)(?: \(\d+\))?\.png$", 2)
    )

# --- CLEAN CLAIM IMAGES ---
@dlt.table(
    name=f"{catalog}.{silver_schema}.claim_images",
    comment="Enriched claim images",
    table_properties={
        "quality": "silver"
    }
)
def training_images():
    df = dlt.readStream(f"{catalog}.{bronze_schema}.claim_images")
    return df.withColumn("image_name", regexp_extract(col("path"), r".*/(.*?.jpg)", 1))


In [0]:
import geopy
import pandas as pd
from pyspark.sql.functions import col, lit, concat, pandas_udf, avg
from typing import Iterator
import random
import dlt

catalog = 'smart_claims_dev'
silver_schema = '02_silver'
gold_schema = '03_gold'

def geocode(geolocator, address):
    try:
        #Skip the API call for faster demo (remove this line for ream)
        return pd.Series({'latitude':  random.uniform(-90, 90), 'longitude': random.uniform(-180, 180)})
        location = geolocator.geocode(address)
        if location:
            return pd.Series({'latitude': location.latitude, 'longitude': location.longitude})
    except Exception as e:
        print(f"error getting lat/long: {e}")
    return pd.Series({'latitude': None, 'longitude': None})
      
@pandas_udf("latitude float, longitude float")
def get_lat_long(batch_iter: Iterator[pd.Series]) -> Iterator[pd.DataFrame]:
  #ctx = ssl.create_default_context(cafile=certifi.where())
  #geopy.geocoders.options.default_ssl_context = ctx
  geolocator = geopy.Nominatim(user_agent="claim_lat_long", timeout=5, scheme='https')
  for address in batch_iter:
    yield address.apply(lambda x: geocode(geolocator, x))

@dlt.table(
    name=f"{catalog}.{gold_schema}.aggregated_telematics",
    comment="Average telematics",
    table_properties={
        "quality": "gold"
    }
)
def telematics():
    return (
        dlt.read(f"{catalog}.{silver_schema}.telematics")
        .groupBy("chassis_no")
        .agg(
            avg("speed").alias("telematics_speed"),
            avg("latitude").alias("telematics_latitude"),
            avg("longitude").alias("telematics_longitude"),
        )
    )

# --- CLAIM-POLICY ---
@dlt.table(
    name=f"{catalog}.{gold_schema}.customer_claim_policy",
    comment = "Curated claim joined with policy records",
    table_properties={
        "quality": "gold"
    }
)
def customer_claim_policy():
    # Read the cleaned policy records
    policy = dlt.readStream(f"{catalog}.{silver_schema}.policy")
    # Read the cleaned claim records
    claim = dlt.readStream(f"{catalog}.{silver_schema}.claim")
    # Read the cleaned customer records
    customer = dlt.readStream(f"{catalog}.{silver_schema}.customer") 
    claim_policy = claim.join(policy, "policy_no")
    return claim_policy.join(customer, policy.CUST_ID == customer.customer_id)

# --- CLAIM-POLICY-TELEMATICS ---
@dlt.table(
    name=f"{catalog}.{gold_schema}.customer_claim_policy_telematics",
    comment="claims with geolocation latitude/longitude",
        table_properties={
        "quality": "gold"
    }
)
def customer_claim_policy_telematics():
  telematics = dlt.read(f"{catalog}.{gold_schema}.aggregated_telematics")
  customer_claim_policy = dlt.read(f"{catalog}.{gold_schema}.customer_claim_policy").where("BOROUGH is not null")
  return (customer_claim_policy
            .withColumn("lat_long", get_lat_long(col("address")))
            .join(telematics, on="chassis_no")
        )
  


