In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
df_locations = spark.table("air_quality.bronze.air_quality_locations_bronze")
df_sensors = spark.table("air_quality.bronze.air_quality_sensors_bronze")


In [0]:
# Explode sensors array in locations to join with sensor data
from pyspark.sql.functions import explode, col
# Explode sensors array in locations to join with sensor data
df_sensors_flat = df_locations.select(

    "id", "name", "locality", "timezone", "country", "coordinates",
    explode("sensors").alias("sensor")
).select(
    col("id").alias("location_id"),
    col("name").alias("location_name"),
    col("locality"),
    col("timezone"),
    col("country.code").alias("country_code"),
    col("coordinates.latitude").alias("latitude"),
    col("coordinates.longitude").alias("longitude"),
    col("sensor.id").alias("sensor_id"),
    col("sensor.name").alias("sensor_name"),
    col("sensor.parameter.id").alias("parameter_id"),
    col("sensor.parameter.name").alias("parameter_name"),
    col("sensor.parameter.units").alias("parameter_units"),
    col("sensor.parameter.displayName").alias("parameter_display_name")
)


In [0]:
# Explode coverage info if needed
from pyspark.sql.functions import col

# Convert coverage dictionary to multiple columns
df_sensors_measured = df_sensors.select(
    "id",
    "name",
    "parameter",
    "coverage",
    "datetimeFirst",
    "datetimeLast"
)

# Example: extract coverage percentages as numbers
df_sensors_measured = df_sensors_measured.withColumn(
    "percent_coverage",
    col("coverage.percentCoverage").cast("double")
).withColumn(
    "observed_count",
    col("coverage.observedCount").cast("double")
).withColumn(
    "expected_count",
    col("coverage.expectedCount").cast("double")
)


In [0]:
df_sensors_measured = df_sensors_measured \
    .withColumn("datetime_first", F.to_timestamp(col("datetimeFirst.utc"))) \
    .withColumn("datetime_last", F.to_timestamp(col("datetimeLast.utc")))


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

coverage_schema = StructType([
    StructField("expectedCount", DoubleType(), True),
    StructField("expectedInterval", StringType(), True),
    StructField("observedCount", DoubleType(), True),
    StructField("observedInterval", StringType(), True),
    StructField("percentComplete", DoubleType(), True),
    StructField("percentCoverage", DoubleType(), True),
    StructField("datetimeFrom", StructType([
        StructField("utc", StringType(), True),
        StructField("local", StringType(), True)
    ]), True),
    StructField("datetimeTo", StructType([
        StructField("utc", StringType(), True),
        StructField("local", StringType(), True)
    ]), True)
])


In [0]:
from pyspark.sql.functions import from_json

df_sensors_parsed = df_sensors.withColumn(
    "coverage_struct",
    from_json("coverage", coverage_schema)
)

df_sensors_parsed = df_sensors_parsed.withColumn(
    "percent_coverage", col("coverage_struct.percentCoverage")
).withColumn(
    "observed_count", col("coverage_struct.observedCount")
).withColumn(
    "expected_count", col("coverage_struct.expectedCount")
)

df_sensors_parsed = df_sensors_parsed \
    .withColumn("datetime_first", F.to_timestamp(col("datetimeFirst.utc"))) \
    .withColumn("datetime_last", F.to_timestamp(col("datetimeLast.utc")))

In [0]:
df_silver = df_sensors_flat.join(
    df_sensors_parsed,
    df_sensors_flat.sensor_id == df_sensors_parsed.id,
    how="left"
).drop(df_sensors_parsed.id)


In [0]:
df_silver.display()