In [None]:

-- Load raw CSV from Bronze layer
CREATE OR REPLACE TEMP VIEW bronze_lead_raw
USING csv
OPTIONS(
  path 'Files/Bronze/Lead/lead_michigan.csv',
  header 'true',
  inferSchema 'true',
  multiLine 'true',
  mode 'PERMISSIVE'
);



StatementMeta(, 5610aeb3-97d0-4493-9155-56a53ec2a068, 2, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [2]:
--preview
SELECT * FROM bronze_lead_raw LIMIT 5;


StatementMeta(, 5610aeb3-97d0-4493-9155-56a53ec2a068, 3, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 63 fields>

In [3]:
--Transform into Clean Silver View
-- Create cleaned and enriched Silver View
CREATE OR REPLACE TEMP VIEW silver_lead_cleaned AS
SELECT
  TRIM(`OrganizationIdentifier`) AS organization_id,
  TRIM(`OrganizationFormalName`) AS organization_name,
  TRIM(`ActivityIdentifier`) AS activity_id,
  TRIM(`ActivityTypeCode`) AS activity_type,
  TRIM(`ActivityMediaName`) AS media_name,
  TRIM(`ActivityMediaSubdivisionName`) AS media_subdivision,
  TO_DATE(`ActivityStartDate`, 'MM-dd-yyyy') AS sample_date,
  CAST(`ResultMeasureValue` AS FLOAT) AS lead_ppb,
  TRIM(`ResultMeasure/MeasureUnitCode`) AS lead_unit,
  TRIM(`ResultStatusIdentifier`) AS result_status,
  TRIM(`ResultDetectionConditionText`) AS detection_condition,
  TRIM(`CharacteristicName`) AS chemical,
  TRIM(`ResultSampleFractionText`) AS fraction_type,
  TRIM(`MonitoringLocationIdentifier`) AS location_id,
  TRIM(`HydrologicCondition`) AS hydrologic_condition,
  TRIM(`SampleAquifer`) AS aquifer,
  TRIM(`SampleCollectionMethod/MethodName`) AS collection_method,
  TRIM(`SampleCollectionEquipmentName`) AS collection_equipment,
  TO_DATE(`AnalysisStartDate`, 'MM-dd-yyyy') AS analysis_date,
  CURRENT_DATE() AS ingestion_date,
  'lead_michigan.csv' AS source_file,
  -- Enrichment: Safety flag
  CASE
    WHEN CAST(`ResultMeasureValue` AS FLOAT) > 15 THEN 'Unsafe'
    WHEN CAST(`ResultMeasureValue` AS FLOAT) BETWEEN 1 AND 15 THEN 'Moderate'
    ELSE 'Safe'
  END AS lead_risk_level
FROM bronze_lead_raw
WHERE 
  TRIM(`CharacteristicName`) = 'Lead'
  AND `ResultMeasureValue` IS NOT NULL
  AND `ResultMeasure/MeasureUnitCode` = 'ug/l';


StatementMeta(, 5610aeb3-97d0-4493-9155-56a53ec2a068, 4, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [4]:
--preview
SELECT * FROM silver_lead_cleaned LIMIT 5;

StatementMeta(, 5610aeb3-97d0-4493-9155-56a53ec2a068, 5, Finished, Available, Finished)

<Spark SQL result set with 5 rows and 22 fields>

Remove Duplicates

In [5]:
CREATE OR REPLACE TEMP VIEW silver_lead_deduped AS
SELECT *
FROM (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY activity_id, location_id ORDER BY analysis_date DESC) AS row_num
  FROM silver_lead_cleaned
)
WHERE row_num = 1;


StatementMeta(, 5610aeb3-97d0-4493-9155-56a53ec2a068, 6, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

Save to Delta Table

In [7]:
--preview
SELECT * FROM silver_lead_deduped LIMIT 10;

StatementMeta(, 5610aeb3-97d0-4493-9155-56a53ec2a068, 8, Finished, Available, Finished)

<Spark SQL result set with 10 rows and 23 fields>

In [9]:
-- Save final cleaned + deduplicated Silver table
CREATE OR REPLACE TABLE Silver_Lead
USING DELTA
AS
SELECT * FROM silver_lead_deduped;


StatementMeta(, 5610aeb3-97d0-4493-9155-56a53ec2a068, 10, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>