This takes the Bronze Layer Acquisition of the OpenAQ data for the sensors from the cumulative table, validates that the table is idempotent as the Audit part of the process. Then this information is passed forward to analytics. 

In the future, there will be an additional state change table that will indicate whether or not to continue to monitor certain sensors, which will, in turn, regulate which sensors are called upon for the next API call. 

Once this data is saved to volume, it will be sent to analytics. In the future, this will be ingested into a Snowflake Data Warehouse, but for now, I'm going to set up the job in Databricks and have the analytics also shown in Databricks as the final point of the process. The analytics queries will be performed in the Gold Layer as they are prepared for and sent to analytics visualizations. 

In [0]:
raw_df = spark.table("sensor_measurements_raw")
display(raw_df)

In [0]:
from pyspark.sql.functions import col, explode, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType

# Define the schema for each element in the results array.
results_element_schema = StructType([
    StructField("datetime", StructType([
        StructField("utc", StringType()),
        StructField("local", StringType())
    ])),
    StructField("value", DoubleType()),
    StructField("coordinates", StructType([
        StructField("latitude", DoubleType()),
        StructField("longitude", DoubleType())
    ])),
    StructField("sensorsId", IntegerType()),
    StructField("locationsId", IntegerType())
])
# Wrap the element schema in an ArrayType.
results_schema = ArrayType(results_element_schema)

# Assuming raw_df is loaded from your sensor_measurements_raw table.
raw_df = spark.table("sensor_measurements_raw")

# For the 'results' column, if it's not already an array type, cast to string and parse.
df_parsed = raw_df.withColumn(
    "results_json",
    from_json( col("results").cast("string"), results_schema )
)

# Explode the results array to get one row per measurement.
df_exploded = df_parsed.withColumn("result", explode(col("results_json")))

# Flatten the DataFrame.
# Since meta is a map, we can access its value directly using ["key"].
df_flattened = df_exploded.select(
    col("location_id"),
    col("meta")["website"].alias("website"),
    col("meta")["name"].alias("api_name"),
    col("meta")["page"].alias("page"),
    col("meta")["found"].alias("found"),
    col("meta")["limit"].alias("limit"),
    col("result.datetime.utc").alias("datetime"),
    col("result.datetime.local").alias("local_datetime"),
    col("result.value").alias("value"),
    col("result.coordinates.latitude").alias("lat"),
    col("result.coordinates.longitude").alias("lon"),
    col("result.sensorsId").alias("sensor_id"),
    col("result.locationsId").alias("result_location_id")
)

# Optionally, display the result.
# df_flattened.printSchema()
# df_flattened.show(10)

In [0]:
display(df_flattened)

In [0]:
from pyspark.sql.functions import concat, col, lit
filtered_sensors = spark.table("filtered_sensors")

df_filtered = df_flattened.where("datetime >='2025-01-01'") \
        .join(filtered_sensors, on=["location_id", "sensor_id"], how="inner") \
            .withColumnRenamed("parameter_name", "parameter") \
            .withColumnRenamed("parameter_units", "units") \
            .select("location_id", "sensor_id", "parameter", "units", "value", "datetime", "lat", "lon") \
            .dropDuplicates(["location_id", "sensor_id", "datetime"])

In [0]:
display(df_filtered)

In [0]:
permanent_df = spark.table("permanent_sensor_measurements")

In [0]:
display(permanent_df)

In [0]:
duplicate_check_df = df_filtered.groupBy("location_id", "sensor_id", "datetime").count().filter("count > 1")

if duplicate_check_df.count() > 0:
    print("Duplicates detected in the enriched data:")
    duplicate_check_df.show()
else:
    print("No duplicates found. Data is idempotent based on the common key and timestamp.")

In [0]:
from delta.tables import DeltaTable

# Merge the enriched raw data into the permenant table
deltaTable = DeltaTable.forName(spark, "permanent_sensor_measurements")

deltaTable.alias("target").merge(
    source=df_filtered.alias("source"),
    condition="""
      target.location_id = source.location_id AND
      target.sensor_id = source.sensor_id AND
      target.datetime = source.datetime
    """
  ).whenNotMatchedInsertAll().execute()

print("Merge complete. Permanent table updated idempotently.")

In [0]:
%sql
SELECT * 
FROM permanent_sensor_measurements
WHERE to_date(datetime) = current_date() 

In [0]:

%sql
-- CREATE TABLE daily_air_quality_monitor
-- USING DELTA 
-- PARTITIONED BY (result_date) AS


MERGE INTO daily_air_quality_monitor target
USING (
    WITH evaluated_measurements AS (
      SELECT 
        m.location_id,
        m.sensor_id,
        m.parameter,
        m.units,
        m.value,
        to_date(m.datetime) AS result_date,
        m.lat,
        m.lon,
        CASE 
          WHEN m.value >= r.Hazardous THEN 'Hazardous'
          WHEN m.value >= r.Severe THEN 'Severe'
          WHEN m.value >= r.Moderate THEN 'Moderate'
          ELSE 'Good'
        END AS hazard_level
      FROM permanent_sensor_measurements m
      LEFT JOIN air_quality_reference_parameters r
        ON m.parameter = r.Parameter
    ), 
    health_hazard_levels AS (
    SELECT * 
    FROM evaluated_measurements
    WHERE hazard_level IN ('Hazardous', 'Severe', 'Moderate') 
      AND result_date = current_date()
      AND parameter NOT IN ('temperature', 'um003', 'pm1', 'relativehumidity')
    ) 
    SELECT 
      s.GEOID
      , s.state
      , h.parameter
      , h.units
      , h.value
      , h.hazard_level
      , s.median_income
      , s.median_income_margin
      , s.income_bracket
      , h.result_date
    FROM health_hazard_levels h
    INNER JOIN sensors_with_income_levels s
      ON h.location_id = s.nn_location_id AND h.sensor_id = s.sensor_id
) source
ON target.GEOID = source.GEOID 
  AND target.parameter = source.parameter 
  AND target.value = source.value 
  AND target.result_date = source.result_date
WHEN NOT MATCHED THEN 
  INSERT (GEOID, state, parameter, units, value, hazard_level, 
          median_income, median_income_margin, income_bracket, result_date)
  VALUES (source.GEOID, source.state, source.parameter, source.units, source.value, 
          source.hazard_level, source.median_income, source.median_income_margin, 
          source.income_bracket, source.result_date);
