In [2]:
import fsspec
from pyspark.sql import SparkSession
import json
from pyspark.sql.functions import col, input_file_name, regexp_extract,first,to_timestamp,when,least,lit,round,year, month,day, hour, dayofweek, to_date,udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType,IntegerType
from shapely.geometry import Point
from shapely import wkt

StatementMeta(, f2968efe-3117-4d08-82eb-882bcbb916fb, 5, Finished, Available, Finished)

In [3]:
spark=SparkSession.builder.appName('air_backfill').getOrCreate()
abfss_path = "abfss://4906b11e-1e59-4869-9321-062a4696a2db@onelake.dfs.fabric.microsoft.com/62794233-3c68-4109-ab1e-7666b1963827/Files/aq"
silver="abfss://4906b11e-1e59-4869-9321-062a4696a2db@onelake.dfs.fabric.microsoft.com/8d633764-c954-4f64-a6ff-b52dde76bd20/Files/checkpoint"
zones_path="abfss://4906b11e-1e59-4869-9321-062a4696a2db@onelake.dfs.fabric.microsoft.com/8d633764-c954-4f64-a6ff-b52dde76bd20/Tables/silver/taxi_zones"
account_name = "4906b11e-1e59-4869-9321-062a4696a2db"
account_host = "onelake.dfs.fabric.microsoft.com"

fs = fsspec.filesystem(
    "abfss",
    account_name=account_name,
    account_host=account_host
)

StatementMeta(, f2968efe-3117-4d08-82eb-882bcbb916fb, 6, Finished, Available, Finished)

In [4]:
with fs.open(f"{abfss_path}/sensors/sensors.json") as f:
    metadata=json.load(f)
metadata_rows=[(k,v['latitude'],v['longitude']) for k,v in metadata.items()]
df_metadata = spark.createDataFrame(metadata_rows, ["sensor_id", "latitude", "longitude"])

zones=spark.read.format('delta').load(zones_path)
zones_list = [(row.BoroughID, wkt.loads(row.geometry)) for row in zones.collect()]

StatementMeta(, f2968efe-3117-4d08-82eb-882bcbb916fb, 7, Finished, Available, Finished)

In [5]:
schema = StructType([
    StructField("value", DoubleType(), True),
    StructField("parameter", StructType([
        StructField("name", StringType(), True)
    ]), True),
    StructField("period", StructType([
        StructField("datetime_from", StructType([
            StructField("local", StringType(), True)
        ]))
    ]), True),
])
df_raw = (
    spark.read
        .option("multiLine", True)
        .schema(schema)
        .json(f"{abfss_path}/data/*/*.json")
        .withColumn("file_path", input_file_name())
        .withColumn("sensor_id", regexp_extract("file_path", r"/data/([^/]+)/", 1))
        .drop("file_path")
).select(
        "sensor_id",
        to_timestamp(
        col("period.datetime_from.local")).alias("timestamp_local"),
        col("parameter.name").alias("parameter_name"),
        col("value").alias("measurement_value")
    ).groupBy("sensor_id", "timestamp_local").pivot("parameter_name", ["pm25"]).agg(first("measurement_value"))


StatementMeta(, f2968efe-3117-4d08-82eb-882bcbb916fb, 8, Finished, Available, Finished)

In [6]:
df=df_raw.join(df_metadata,on='sensor_id',how='left')

StatementMeta(, f2968efe-3117-4d08-82eb-882bcbb916fb, 9, Finished, Available, Finished)

In [7]:
def point_to_borough(lat, lon):
    pt = Point(lon, lat)
    for borough_id, poly in zones_list:
        if poly.contains(pt):
            return borough_id
    return None

point_to_borough_udf = udf(point_to_borough, IntegerType())

StatementMeta(, f2968efe-3117-4d08-82eb-882bcbb916fb, 10, Finished, Available, Finished)

In [8]:
def process(df):
    df=df.fillna(0)
    for p in ["pm25"]:
        df = df.withColumn(
            p,
            when(col(p) < 0, 0).otherwise(col(p))
        )

    df=(df.groupby("latitude", "longitude", "timestamp_local")
    .agg(
            first("pm25").alias("pm25")
    ))
    df=df.withColumn("LocationID",point_to_borough_udf(col('latitude'),col('longitude')))
    df = df.drop("latitude", "longitude")
    return df


df=process(df)

StatementMeta(, f2968efe-3117-4d08-82eb-882bcbb916fb, 11, Finished, Available, Finished)

In [9]:

spark.sql("CREATE SCHEMA IF NOT EXISTS silver")

df = df.withColumn("year", year("timestamp_local")) \
    .withColumn("month", month("timestamp_local"))
df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .saveAsTable("silver.air_quality")

StatementMeta(, f2968efe-3117-4d08-82eb-882bcbb916fb, 12, Finished, Available, Finished)