In [0]:
import requests, json, time
from datetime import datetime, timezone
from pyspark.sql.functions import col

In [0]:

# DAB passes this in
dbutils.widgets.text("catalog", "main_uc_dev")
catalog = dbutils.widgets.get("catalog")

stations_df = spark.read.format("json").load(
    f"/Volumes/{catalog}/bronze/met_airquality_stations"
)

In [0]:
# --- API setup ---
BASE = "https://api.met.no/weatherapi/airqualityforecast/0.1/"
HEADERS = {"User-Agent": "herman.lillejord@gmail.com Airqualityforecast_DatabricksFreeEdition", "Accept": "application/json"}

vars_to_pull = [
    "AQI", "pm25_concentration", "pm10_concentration",
    "no2_concentration", "o3_concentration", "so2_concentration"
]

In [0]:
# --- Target Volume path ---
volume_root = f"/Volumes/{catalog}/bronze/met_bergen_airquality_jsondumps"
ts_part     = datetime.now(timezone.utc).strftime("%Y/%m/%d/%H%M%S")

schema_name = "bronze"
volume_name = "met_bergen_airquality_jsondumps"


spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema_name}")

spark.sql(f"""
CREATE VOLUME IF NOT EXISTS {catalog}.{schema_name}.{volume_name}
""")

target_dir  = f"{volume_root}/{ts_part}"
target_file = f"{target_dir}/airquality.jsonl"


In [0]:

# --- 4) Fetch data for all Bergen stations ---
run_ts_iso = datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")
records = []

# Filter to only stations in Bergen for quick runs
#stations_df = stations_df.where(col("kommune.name").contains("Bergen"))

for st in stations_df.collect():
    lat = st["latitude"]
    lon = st["longitude"]
    kommune = st["kommune"]["name"]

    for var in vars_to_pull:
        params = {"lat": lat, "lon": lon, "filter_vars": var}
        r = requests.get(BASE, params=params, headers=HEADERS, timeout=20)
        r.raise_for_status()
        js = r.json()

        for t in js.get("data", {}).get("time", []):
            var_obj = (t.get("variables") or {}).get(var) or {}

            records.append({
                "time_from": t.get("from"),
                "time_to": t.get("to"),
                "variable": var,
                "value": var_obj.get("value"),

                # station info
                "station_name": st["name"],
                "station_eoi": st["eoi"],
                "latitude": lat,
                "longitude": lon,
                "kommune": kommune,
                "ingested_at_utc": run_ts_iso
            })

        time.sleep(0.2)


In [0]:

# Convert to JSON Lines text
jsonl_str = "\n".join(json.dumps(rec, ensure_ascii=False) for rec in records)

# Ensure directory and write the single JSONL file

dbutils.fs.mkdirs(target_dir)
dbutils.fs.put(target_file, jsonl_str, overwrite=True)



In [0]:
df = spark.read.format("json").load(target_file)
display(df)