In [0]:
dbutils.widgets.text("api_key","")
x_api_key = dbutils.widgets.get("api_key")

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS air_quality

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS air_quality.openaq

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS air_quality.openaq.ingestion

In [0]:
%sql
CREATE TABLE IF NOT EXISTS air_quality.openaq.ingestion_meta_data(
  source STRING,
  last_processed_timestamp TIMESTAMP
)
USING DELTA;

In [0]:
%sql
INSERT INTO air_quality.openaq.ingestion_meta_data VALUES ('openaq', TIMESTAMP('2025-12-22 00:00:00'))


In [0]:
metadata_df = spark.table("air_quality.openaq.ingestion_meta_data") \
                   .filter("source = 'openaq'")
metadata_df.show()


Incremental Data Load for Locations using Rest API

In [0]:
import requests, json
from datetime import datetime
from pyspark.sql.functions import *

In [0]:
headers = {
    "X-API-Key" : x_api_key
}

url = "https://api.openaq.org/v3/locations"
response = requests.get(url , headers = headers)

locations_data = response.json()


In [0]:
len(locations_data)


In [0]:
locations = locations_data.get("results", [])

filtered_locations = [
    loc for loc in locations
    if loc.get("country")["name"] == "India"
]
filtered_locations

In [0]:
# Adding ingestiontime column to the records
from datetime import datetime, timezone

current_ts = datetime.now(timezone.utc).isoformat()
raw_payload = {
    "source" : "openaq",
    "ingestiontime" : current_ts,
    "results" : filtered_locations
}

display(raw_payload)

In [0]:
# Writing Data to Volume

current_ts = datetime.now(timezone.utc)

output_path = (
    f"/Volumes/air_quality/openaq/ingestion/locations/"
    f"ingestiontime = {current_ts.strftime('%Y%m%d%H%M%S')}"
)

output_path

file_name = f"locations_{current_ts.strftime('%Y%m%d%H%M%S%f')}.json"

dbutils.fs.put(
    f"{output_path}/{file_name}",
    json.dumps(raw_payload),
    overwrite = False
)

In [0]:
spark.sql(f"""
UPDATE air_quality.openaq.ingestion_meta_data
SET last_processed_timestamp = TIMESTAMP('{current_ts}')
WHERE source = 'openaq'
""")

Incremental Data Load for Measurements using REST API

In [0]:
loc_df = spark.read.json(
    "/Volumes/air_quality/openaq/ingestion/locations/"
)

In [0]:
display(loc_df)

In [0]:
# Extracting Location ID's
locations_df = loc_df.selectExpr("explode(results) as loc")

locations_df.selectExpr("loc.id as location_id").show()

# Extracting Sensor ID's
sensors_df = locations_df.selectExpr("explode(loc.sensors) as sensor",
                                    "loc.id as locationid")

# sensors_df.selectExpr("sensor.id as sensor_id", "sensor.parameter.name as sensor_name","sensor.parameter.units as sensor_units").show()

sensor_ids = (
    sensors_df
    .select("sensor.id")
    .distinct()
    .collect()
)

sensor_ids = [row["id"] for row in sensor_ids]
sensor_ids

In [0]:
%sql
CREATE TABLE IF NOT EXISTS air_quality.openaq.measurement_metadata(
  sensor_id STRING,
  last_processed_timestamp TIMESTAMP
)
USING DELTA

In [0]:
def get_last_processed_ts(sensor_id):
    df = spark.table("air_quality.openaq.measurement_metadata") \
        .filter(f"sensor_id = {sensor_id}")
    
    if df.count() == 0:
        return "2025-12-22T00:00:00Z"
    return df.collect()[0]["last_processed_timestamp"]


In [0]:
#API CALL FOR MEASURMENTS

def fetch_measurements(sensor_id, date_from):
    url = f"https://api.openaq.org/v3/sensors/{sensor_id}/measurements/daily"
    
    headers = {
        "X-API-Key": x_api_key
    }

    params = {
        "date_from" : date_from,
        "limit" : 1000
    }

    response = requests.get(url, headers = headers, params = params)

    return response.json().get("results",[])




In [0]:
from datetime import datetime, timezone
import json
import builtins


all_measurements = []
metadata_updates = []

for id in sensor_ids:
    last_ts = get_last_processed_ts(id)

    measurements = fetch_measurements(id , last_ts)

    if len(measurements) == 0 :
        continue

    all_measurements.extend(measurements)

    max_ts = builtins.max(m["period"]["datetimeTo"]["utc"]
                 for m in measurements
                 if m.get("period") and m["period"].get("datetimeTo")
                 )
    metadata_updates.append((id, max_ts))


In [0]:
metadata_updates

In [0]:
all_measurements

In [0]:
# Writing measurements data to Volume.
current_ts = datetime.now(timezone.utc)

if all_measurements:
    raw_payload = {
        "source":"openaq_v3",
        "ingestiontime": current_ts.isoformat(),
        "results" : all_measurements
    }

    output_path = (
    f"/Volumes/air_quality/openaq/ingestion/measurements/"
    f"ingestiontime = {current_ts.strftime('%Y%m%d%H%M%S')}"
)

output_path

file_name = f"measurements_{current_ts.strftime('%Y%m%d%H%M%S%f')}.json"

dbutils.fs.put(
    f"{output_path}/{file_name}",
    json.dumps(raw_payload),
    overwrite = False
)


In [0]:
measures_df = spark.read.json(
    "/Volumes/air_quality/openaq/ingestion/measurements/"
)

measures_df.show(5)

In [0]:
# Updating MetaData Table for Measurements.
metadata_df = spark.createDataFrame(metadata_updates,
                                    ["sensor_id", "last_processed_timestamp"])\
                                .withColumn("last_processed_timestamp",
                    col("last_processed_timestamp").cast("timestamp")
)

metadata_df.createOrReplaceTempView("updates")

spark.sql("""
            MERGE INTO air_quality.openaq.measurement_metadata tgt
            USING updates src
            ON tgt.sensor_id = src.sensor_id
            WHEN MATCHED THEN UPDATE SET tgt.last_processed_timestamp = src.last_processed_timestamp
            WHEN NOT MATCHED THEN INSERT *
          """)



In [0]:
%sql
SELECT * FROM air_quality.openaq.measurement_metadata

In [0]:
%sql
SELECT * FROM air_quality.openaq.ingestion_meta_data