### 🔹 Raw Sensor Data Ingestion

This DLT table ingests streaming CSV files of raw aircraft sensor data using Auto Loader.
The schema includes:
- `timestamp`, `aircraft_id`, `model`
- Sensor metrics like `engine_temp`, `fuel_efficiency`, `vibration`, `altitude`, `airspeed`, and `anomaly_score`.

Files are read from a managed volume at `/Volumes/arao/aerodemo/tmp/raw`.

✅ **DLT Benefits**:
- Built-in schema enforcement
- Automatic lineage
- Continuous ingestion readiness

In [0]:
import dlt
from pyspark.sql.types import *

# Define the schema
sensor_schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("aircraft_id", StringType(), True),
    StructField("model", StringType(), True),
    StructField("engine_temp", DoubleType(), True),
    StructField("fuel_efficiency", DoubleType(), True),
    StructField("vibration", DoubleType(), True),
    StructField("altitude", DoubleType(), True),
    StructField("airspeed", DoubleType(), True),
    StructField("anomaly_score", DoubleType(), True),
    StructField("oil_pressure", DoubleType(), True),
    StructField("engine_rpm", IntegerType(), True),
    StructField("battery_voltage", DoubleType(), True)
])

@dlt.table(
    name="raw_sensor_data",
    comment="Ingested raw sensor data from CSVs in mounted volume"
)
def load_raw_sensor_data():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("pathGlobFilter", "*.csv")
        .option("cloudFiles.schemaLocation", "/Volumes/arao/aerodemo/tmp/raw/schema/raw_sensor_data_v2")
        .schema(sensor_schema)
        .load("/Volumes/arao/aerodemo/tmp/raw")
    )

### 🔹 Maintenance Events Ingestion

This DLT table loads structured maintenance event logs for aircraft.
Schema includes:
- `aircraft_id`, `event_date`, `event_type` (e.g., "Routine Check", "Engine Repair")

Source files are CSVs dropped in `/Volumes/arao/aerodemo/tmp/maintenance`.

✅ **DLT Benefits**:
- Data quality enforcement using `@dlt.expect`
- Easier governance and visibility

In [0]:
import dlt
from pyspark.sql.types import StructType, StructField, StringType, DateType

@dlt.table(
    name="maintenance_events",
    comment="Streaming ingestion of maintenance events like routine checks and engine repairs"
)
def load_maintenance_events():
    maintenance_schema = StructType([
        StructField("aircraft_id", StringType(), True),
        StructField("event_date", DateType(), True),
        StructField("event_type", StringType(), True)
    ])

    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("pathGlobFilter", "*.csv")
        .option("cloudFiles.schemaLocation", "/Volumes/arao/aerodemo/tmp/maintenance/schema")
        .schema(maintenance_schema)
        .load("/Volumes/arao/aerodemo/tmp/maintenance")
    )

### 🔹 Cleaned Sensor Data

This DLT table applies quality filters to the incoming raw sensor data stream to ensure data reliability. The following constraints are enforced:

- `engine_temp` must be ≤ 700
- `fuel_efficiency` must be ≥ 50
- `vibration` must be ≤ 25

Additionally, data expectations (`@dlt.expect`) are defined to ensure:
- Engine temperature is within 0–1000
- Fuel efficiency is positive
- Vibration is non-negative

✅ **DLT Benefits**:
- Streaming data validation
- Simplified enforcement of operational thresholds
- Automatic schema tracking and lineage

In [0]:
import dlt
from pyspark.sql.functions import col

@dlt.table(
    name="cleaned_sensor_data",
    comment="Cleaned sensor data after applying quality filters and validation on new metrics"
)
@dlt.expect("valid_engine_temp", "engine_temp BETWEEN 0 AND 1000")
@dlt.expect("valid_fuel_efficiency", "fuel_efficiency > 0")
@dlt.expect("valid_vibration", "vibration >= 0")
@dlt.expect("valid_altitude", "altitude BETWEEN 20000 AND 45000")
@dlt.expect("valid_airspeed", "airspeed BETWEEN 300 AND 600")
@dlt.expect("valid_oil_pressure", "oil_pressure BETWEEN 20 AND 100")
@dlt.expect("valid_engine_rpm", "engine_rpm BETWEEN 1000 AND 10000")
@dlt.expect("valid_battery_voltage", "battery_voltage BETWEEN 22 AND 30")
def cleaned_sensor_data():
    return (
        dlt.read_stream("raw_sensor_data")
        .filter(
            (col("engine_temp") <= 700) &
            (col("fuel_efficiency") >= 50) &
            (col("vibration") <= 25) &
            (col("altitude").between(20000, 45000)) &
            (col("airspeed").between(300, 600)) &
            (col("oil_pressure").between(20, 100)) &
            (col("engine_rpm").between(1000, 10000)) &
            (col("battery_voltage").between(22, 30))
        )
    )

### 🔹 Prediction Results

This DLT table performs aggregation on cleaned sensor data to produce a daily risk score per aircraft. The logic includes:

- Grouping by `aircraft_id` and `timestamp` (converted to date)
- Calculating the average `engine_temp` and `vibration`
- Applying a formula to compute a `risk_score`:
  
  \[
  \text{{risk_score}} = \left(\frac{{\text{{avg(engine_temp)}}}}{700} + \frac{{\text{{avg(vibration)}}}}{25}\right) \times 50
  \]

✅ **DLT Benefits**:
- Continuous scoring as new sensor data arrives
- Built-in aggregation and model-based logic
- Enables downstream alerting and visualization

In [0]:
import dlt
from pyspark.sql.functions import col, avg, to_date

@dlt.table(
    name="prediction_results",
    comment="Predicted AOG risk scores using normalized sensor metrics"
)
def prediction_results():
    df = dlt.read("cleaned_sensor_data")
    
    return (
        df.groupBy("aircraft_id", to_date("timestamp").alias("prediction_date"))
        .agg(
            (
                (avg("engine_temp") / 700) +
                (avg("vibration") / 25) +
                (1 - avg("fuel_efficiency") / 100) +
                (1 - avg("oil_pressure") / 100) +
                (avg("engine_rpm") / 10000) +
                (1 - avg("battery_voltage") / 30)
            * 100 / 6  # Normalize to scale
            ).alias("risk_score")
        )
    )

---

### 📘 Enriched Sensor Data with Maintenance Events


### 🔹 Enriched Sensor Data

This view joins cleaned sensor records with matching maintenance events on `aircraft_id` and `date(timestamp) = event_date`.

Purpose:
- Add maintenance context to sensor data
- Enable root cause analysis and post-maintenance tracking

✅ **DLT Benefits**:
- Built-in lineage from both sensor and maintenance streams
- Flexible expansion for ML or analytics

In [0]:
import dlt
from pyspark.sql.functions import col, to_date, max as spark_max
from pyspark.sql.window import Window
import pyspark.sql.functions as F

@dlt.table(
    comment="Sensor data enriched with most recent maintenance event info"
)
def enriched_sensor_data():
    cleaned_df = dlt.read("cleaned_sensor_data").withColumn("reading_date", to_date("timestamp"))
    events_df = dlt.read("maintenance_events").withColumnRenamed("event_date", "maint_date")

    # Join on aircraft_id, and filter only maintenance events before or on the sensor reading date
    joined = cleaned_df.join(events_df, "aircraft_id", "left") \
        .filter(col("maint_date") <= col("reading_date"))

    # Use window function to get the most recent maintenance event before each reading
    window = Window.partitionBy("aircraft_id", "timestamp").orderBy(col("maint_date").desc())

    result = (joined
              .withColumn("rank", F.row_number().over(window))
              .filter(col("rank") == 1)
              .drop("rank", "reading_date"))

    return result

%md
### 📊 DLT Table: `sensor_features` – Feature Engineering for ML

This table generates engineered features from enriched sensor data, designed to support predictive maintenance models. Key aspects include:

- **7-day rolling averages** for engine temperature, vibration, and RPM to capture trends over time.
- **Lag-based anomaly detection** to provide recent anomaly context.
- **Maintenance proximity** (`days_since_maint`) calculated as the number of days since the last known maintenance event.
- Selection of core telemetry metrics such as oil pressure, battery voltage, and altitude to give a holistic picture of aircraft health.

These features form the foundation for risk scoring, anomaly classification, and machine learning model training or inference.

In [0]:
import dlt
from pyspark.sql.functions import col, avg, lag, to_date
from pyspark.sql.window import Window
import pyspark.sql.functions as F

@dlt.table(
    comment="Engineered sensor features for ML model input"
)
def sensor_features():
    df = dlt.read("enriched_sensor_data")
    df = df.withColumn("date", to_date("timestamp"))

    # Define rolling window by aircraft and date (7-day window for averages)
    rolling_window = Window.partitionBy("aircraft_id").orderBy("date").rowsBetween(-6, 0)

    # Define window for lag without a frame
    lag_window = Window.partitionBy("aircraft_id").orderBy("date")

    return (
        df.withColumn("avg_engine_temp_7d", avg("engine_temp").over(rolling_window))
          .withColumn("avg_vibration_7d", avg("vibration").over(rolling_window))
          .withColumn("avg_rpm_7d", avg("engine_rpm").over(rolling_window))
          .withColumn("prev_anomaly", lag("anomaly_score", 1).over(lag_window))
          .withColumn("days_since_maint", F.datediff("date", F.coalesce(F.col("maint_date"), F.lit("1970-01-01"))))
          .select(
              "timestamp", "aircraft_id", "model", "engine_temp", "fuel_efficiency", "vibration",
              "altitude", "airspeed", "oil_pressure", "engine_rpm", "battery_voltage",
              "anomaly_score", "event_type", "avg_engine_temp_7d", "avg_vibration_7d",
              "avg_rpm_7d", "prev_anomaly", "days_since_maint"
          )
    )