### Databricks DLT Pipeline Code

This notebook contains the DLT pipeline code for creating bronze tables in PYTHON.

Look for `<CHANGE_HERE: ...>` placeholders in the code and replace them with your values. Detailed instructions follow below.

#### Table Naming Instructions
Before running the code, you need to specify where your tables will be stored. You can use any of these three formats:

1. Three level catalog.schema.table format:
   - Replace `<CHANGE_HERE: catalog>.<CHANGE_HERE: schema>` with your Unity Catalog and schema names
   - Example: `unity_catalog.my_schema.table_name`

2. Two level schema.table format:
   - Replace `<CHANGE_HERE: schema>` with your schema name
   - The default catalog will be used
   - Example: `my_schema.table_name`

3. Simple table name format:
   - Use just the table name
   - Both default catalog and schema will be used
   - Example: `table_name`

#### Table Documentation and Configuration
For each table in the code:

1. Table Comments:
   - Replace `<CHANGE_HERE: enter_table_comment>` with a descriptive comment about the table's purpose and contents
   - Example: "Bronze table containing raw customer transaction data"

2. For Change Feed Tables:
   - Replace `<CHANGE_HERE: 1/2>` with either 1 or 2 to specify the SCD (Slowly Changing Dimension) type:
     - Type 1: Overwrites the old value with the new value
     - Type 2: Maintains history by creating new records for each change

#### Learn More
- [Streaming Tables Documentation](https://docs.databricks.com/aws/en/dlt/streaming-tables) - Learn about streaming tables and their use cases for data ingestion and low-latency streaming transformations.
- [Materialized Views Documentation](https://docs.databricks.com/aws/en/dlt/materialized-views) - Understand how materialized views work and their benefits for incremental data processing.

#### Workshop To DO - SCD Table: valve_compliance_changes

In [0]:
@dlt.table(name="bronze.valve_compliance_changes")
def source():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/valve_compliance_changes/")
    )

dlt.create_streaming_table(
    name="silver.valve_compliance_changes",
    comment="testing 123 for dry run"
)

dlt.apply_changes(
    target="silver.valve_compliance_changes",
    source="bronze.valve_compliance_changes",
    keys=['valve_id', 'asset_id'],
    sequence_by="change_timestamp",
    stored_as_scd_type="2"
)


#### Bronze & Silver Tables: asset_config_changes

In [0]:
@dlt.table(name="bronze.asset_config_changes")
def source():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/asset_config_changes/")
    )

dlt.create_streaming_table(
    name="silver.asset_config_changes",
    comment="fixing up a streaming table"
)

dlt.apply_changes(
    target="silver.asset_config_changes",
    source="bronze.asset_config_changes",
    keys=['config_id', 'asset_id'],
    sequence_by="change_timestamp",
    stored_as_scd_type="2"
)


#### Bronze & Silver Tables: calibration_records

In [0]:
@dlt.table(name="bronze.calibration_records")
def calibration_records_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/calibration_records/")
    )

@dlt.table(name="silver.calibration_records")
@dlt.expect("valid_drift_percentage", "drift_percentage BETWEEN 0 AND 100")
def calibration_records_silver():
    return spark.readStream.table("bronze.calibration_records")


#### Bronze & Silver Table: inspectors

In [0]:
@dlt.table(name="bronze.inspectors")
def inspectors_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/inspectors/")
    )

@dlt.table(name="silver.inspectors")

def inspectors_silver():
    return spark.readStream.table("bronze.inspectors")


#### Bronze and Silver Tables: shift_schedule

In [0]:
@dlt.table(name="bronze.shift_schedule")
def shift_schedule_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/shift_schedule/")
    )

@dlt.table(name="silver.shift_schedule")

def shift_schedule_silver():
    return spark.readStream.table("bronze.shift_schedule")


#### Bronze Table: site_info

In [0]:
@dlt.table(name="bronze.site_info")
def site_info_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/site_info/")
    )

@dlt.table(name="silver.site_info")

def site_info_silver():
    return spark.readStream.table("bronze.site_info")


#### Tables: daily_weather

#### Workshop To Do: Silver Table - Add the Watermark!

In [0]:
from pyspark.sql.functions import col, current_timestamp

@dlt.table(name="bronze.daily_weather")
def daily_weather_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/daily_weather/")
        .withColumn("_ingestion_timestamp", current_timestamp())
    )

@dlt.table(
    name="silver.daily_weather",
    comment="Silver table for daily weather data with watermark handling for late-arriving data"
)
@dlt.expect("valid_temperature_celsius", "temperature_celsius BETWEEN -40 AND 50")
@dlt.expect("valid_humidity_percentage", "humidity_percentage BETWEEN 0 AND 100")
@dlt.expect("valid_wind_speed", "wind_speed_kmh BETWEEN 0 AND 200")
@dlt.expect("valid_precipitation", "precipitation_mm BETWEEN 0 AND 500")
@dlt.expect("valid_pressure", "atmospheric_pressure BETWEEN 980 AND 1030")
@dlt.expect("not_null_site_id", "site_id IS NOT NULL")
@dlt.expect("not_null_date", "date IS NOT NULL")
def daily_weather_silver():
    return (
        spark.readStream.table("bronze.daily_weather")
        .withColumn("date_timestamp", col("date").cast("timestamp"))  # Cast DATE to TIMESTAMP
        .withWatermark("date_timestamp", "7 days")  # Use the TIMESTAMP column for watermark
        .dropDuplicates(["site_id", "date"])  # Still deduplicate on original DATE column
        .select(
            col("site_id"),
            col("date"),  # Keep original DATE column
            col("temperature_celsius"),
            col("humidity_percentage"),
            col("wind_speed_kmh"),
            col("precipitation_mm"),
            col("atmospheric_pressure"),
            col("weather_condition"),
            col("_ingestion_timestamp"),
            current_timestamp().alias("_processed_timestamp")
        )
    )


#### Bronze Table: alert_history

In [0]:
import dlt
from pyspark.sql.functions import col, current_timestamp, when, lit, expr

@dlt.table(name="bronze.alert_history")
def alert_history_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/alert_history/")
        .withColumn("_ingestion_timestamp", current_timestamp())
    )

#### Workshop To Do - Silver Layer Data Quality Expectations

In [0]:
@dlt.table(
    name="silver.alert_history",
    comment="Silver table for alert history with data quality and active alert tracking"
)
#<To Do>: Write your own expectation! Try expect_or_drop() or expect_or_fail()
@dlt.expect("valid_threshold_value", "threshold_value BETWEEN 0 AND 1000")
@dlt.expect("valid_measured_value", "measured_value BETWEEN 0 AND 1000")
@dlt.expect("valid_alert_timestamp", "alert_timestamp IS NOT NULL")
@dlt.expect("valid_alert_type", "alert_type IN ('THRESHOLD', 'MALFUNCTION', 'CALIBRATION', 'COMPLIANCE')")
@dlt.expect("valid_severity", "severity IN ('LOW', 'MEDIUM', 'HIGH', 'CRITICAL')")
@dlt.expect("not_null_alert_id", "alert_id IS NOT NULL")
@dlt.expect("not_null_asset_id", "asset_id IS NOT NULL")
@dlt.expect("not_null_site_id", "site_id IS NOT NULL")
def alert_history_silver():
    return (
        spark.readStream.table("bronze.alert_history")
        .withWatermark("alert_timestamp", "30 days")
        .select(
            col("alert_id"),
            col("asset_id"),
            col("site_id"),
            col("emission_id"),
            col("alert_timestamp"),
            col("alert_type"),
            col("severity"),
            col("threshold_value"),
            col("measured_value"),
            col("resolution_timestamp"),
            col("resolution_action"),
            col("inspector_id"),
            col("notes"),
            col("_ingestion_timestamp"),
            current_timestamp().alias("_processed_timestamp")
        )
    )

#### Workshop To Do - Gold Layer Streaming Table: Active Alerts

In [0]:
@dlt.table(name="gold.active_alerts")
def active_alerts_gold():
    return (spark.readStream.table("silver.alert_history")
            # Filter for active alerts only (no resolution timestamp)
            .filter("resolution_timestamp IS NULL")
            # Add watermark for late-arriving data handling
            .withWatermark("alert_timestamp", "1 hour")
            # Keep only essential columns
            .select(
                "alert_id",
                "asset_id", 
                "site_id",
                "emission_id",
                "alert_timestamp",
                "alert_type",
                "severity",
                "threshold_value",
                "measured_value",
                "inspector_id",
                "notes"
            )
            # Deduplicate based on alert_id to keep latest status
            .dropDuplicates(["alert_id"])
    )

#### Workshop To Do: Gold Materialized View: Alert Summary

In [0]:
@dlt.table(
    name="gold.alert_summary",
    comment="Aggregated alert statistics and trends"
)
def alert_summary():
    return (
        dlt.read("silver.alert_history")
        .groupBy("site_id", "alert_type", "severity")
        .agg(
            expr("count(*)").alias("total_alerts"),
            expr("sum(case when resolution_timestamp is null then 1 else 0 end)").alias("active_alerts"),
            expr("avg(case when resolution_timestamp is not null then unix_timestamp(resolution_timestamp) - unix_timestamp(alert_timestamp) else null end)").alias("avg_resolution_time_seconds"),
            expr("max(alert_timestamp)").alias("latest_alert_timestamp"),
            expr("min(alert_timestamp)").alias("earliest_alert_timestamp")
        )
        .withColumn("resolution_rate", 
                   expr("(total_alerts - active_alerts) / total_alerts * 100"))
        .withColumn("_processed_timestamp", current_timestamp())
    )

#### Bronze & Silver Tables: asset

In [0]:
@dlt.table(name="bronze.asset")
def asset_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/asset/")
    )

@dlt.table(name="silver.asset")

def asset_silver():
    return spark.readStream.table("bronze.asset")


#### Bronze & Silver Tables: compliance_regulations

In [0]:
@dlt.table(name="bronze.compliance_regulations")
def compliance_regulations_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/compliance_regulations/")
    )

@dlt.table(name="silver.compliance_regulations")

def compliance_regulations_silver():
    return spark.readStream.table("bronze.compliance_regulations")


#### Bronze & Silver Tables: gas_production

In [0]:
@dlt.table(name="bronze.gas_production")
def gas_production_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/gas_production/")
    )

@dlt.table(name="silver.gas_production")
@dlt.expect("valid_gas_volume_m3", "gas_volume_m3 BETWEEN 0 AND 10000")
@dlt.expect("valid_gas_pressure", "gas_pressure BETWEEN 0 AND 500")
def gas_production_silver():
    return spark.readStream.table("bronze.gas_production")


#### Bronze & Silver Tables: maintenance_record

In [0]:
@dlt.table(name="bronze.maintenance_record")
def maintenance_record_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/maintenance_record/")
    )

@dlt.table(name="silver.maintenance_record")
@dlt.expect("valid_cost", "cost BETWEEN 0 AND 100000")
def maintenance_record_silver():
    return spark.readStream.table("bronze.maintenance_record")


#### Bronze & Silver Tables: sensor_emissions

In [0]:
@dlt.table(name="bronze.sensor_emissions")
def sensor_emissions_bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("multiLine", "true")
        .load("/Volumes/harrison_chen_catalog/synthetic_energy/energy_volume/Gas_Emissions/sensor_emissions/")
    )

@dlt.table(name="silver.sensor_emissions")
@dlt.expect("valid_methane_level", "methane_level BETWEEN 0 AND 1000")
@dlt.expect("valid_co2_level", "co2_level BETWEEN 0 AND 1000")
@dlt.expect("valid_nox_level", "nox_level BETWEEN 0 AND 1000")
@dlt.expect("valid_temperature", "temperature BETWEEN -20 AND 100")
@dlt.expect("valid_pressure", "pressure BETWEEN 0 AND 1000")
@dlt.expect("valid_flow_rate", "flow_rate BETWEEN 0 AND 10000")
def sensor_emissions_silver():
    return spark.readStream.table("bronze.sensor_emissions")


#### Gold: Emissions Analytics Table

In [0]:
from pyspark.sql.functions import avg, approx_count_distinct, date_trunc, col, to_date

@dlt.table(
    name="gold.emissions_analytics",
    comment="Streaming gold table for emissions analytics",
    temporary=False
)
def emissions_analytics():
    # Get base tables
    sensor_emissions = dlt.read("silver.sensor_emissions") \
        .withColumn("emission_date", to_date(col("timestamp")))
    
    site_info = dlt.read("silver.site_info")
    daily_weather = dlt.read("silver.daily_weather")
    
    # First join - just emissions and site info
    base_join = sensor_emissions \
        .join(site_info, ["site_id"]) \
        .join(daily_weather, 
              (sensor_emissions.site_id == daily_weather.site_id) & 
              (sensor_emissions.emission_date == daily_weather.date)) \
        .select(
            sensor_emissions.emission_date,
            sensor_emissions.site_id,
            site_info.site_name,
            daily_weather.temperature_celsius,
            daily_weather.humidity_percentage,
            sensor_emissions.methane_level,
            sensor_emissions.co2_level,
            sensor_emissions.nox_level,
            sensor_emissions.asset_id
        )
    
    # Simple aggregation
    return base_join \
        .groupBy(
            "emission_date",
            "site_id",
            "site_name",
            "temperature_celsius",
            "humidity_percentage"
        ) \
        .agg(
            avg("methane_level").alias("avg_methane_level"),
            avg("co2_level").alias("avg_co2_level"),
            avg("nox_level").alias("avg_nox_level"),
            approx_count_distinct("asset_id").alias("approx_reporting_sensors")
        )

#### Workshop To Do - Gold: Asset Compliance History Table

In [0]:
from pyspark.sql.functions import expr
@dlt.table(name="gold.valve_compliance_history")
def valve_compliance_history_gold():
    # Get current and historical valve compliance states
    return (spark.read.table("silver.valve_compliance_changes")
            # Filter out NULL records in key fields
            .filter("valve_id IS NOT NULL AND asset_id IS NOT NULL")
            # Filter out records with NULL compliance status
            .filter("compliance_status IS NOT NULL")
            # Select and rename columns
            .select(
                "valve_id",
                "asset_id",
                "compliance_status",
                "inspector_id",
                "inspection_notes",
                "change_timestamp",
                "__START_AT",
                "__END_AT"
            )
            .withColumnRenamed("__START_AT", "valid_from")
            .withColumnRenamed("__END_AT", "valid_to")
            # Add business insights
            .withColumn("compliance_duration_days", 
                       expr("datediff(valid_to, valid_from)"))
            .withColumn("is_current_record", 
                       expr("CASE WHEN valid_to IS NULL THEN 'True' ELSE 'False' END"))
            # Deduplicate based on valve_id, asset_id, and valid_from
            .dropDuplicates(["valve_id", "asset_id", "valid_from"])
    )