In [1]:
import requests
import json
import polars as pl
import re
from datetime import datetime
from typing import Optional, Dict, Any, List

import os
from dotenv import load_dotenv
load_dotenv()
db_password = os.getenv("DB_PASSWORD")

In [2]:
# TfL Air Quality API URL
API_URL = "https://api.tfl.gov.uk/AirQuality/"

def fetch_forecast_data(url: str) -> Optional[Dict[str, Any]]:
    
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()
        data = response.json()

        return data.get("currentForecast", [])
    
    except requests.exceptions.Timeout:
        print("❌ Error: The API request timed out.")
    except requests.exceptions.HTTPError as http_err:
        print(f"❌ HTTP error occurred: {http_err}")
    except requests.exceptions.RequestException as req_err:
        print(f"❌ Request error occurred: {req_err}")
    except json.JSONDecodeError:
        print("❌ Error: Failed to parse JSON response.")

forecast_data = fetch_forecast_data(API_URL)

if forecast_data:
    print("✅ API data successfully retrieved!")
else:
    print("❌ Failed to retrieve data.")

✅ API data successfully retrieved!


In [3]:
def extract_date(forecast_summary: str) -> datetime:
    match = re.search(r"(\d{1,2} \w+)", forecast_summary)

    if match:
        extracted_date = f"{match.group(1)} {datetime.today().year}"
        return datetime.strptime(extracted_date,  "%d %B %Y")
    return None

def transform_forecast(forecast: List[Dict]) -> pl.DataFrame:
    processed_forecasts = []
    
    for day in forecast:
        processed_forecasts.append({
            "ID": day.get("forecastID"),
            "Type": day.get("forecastType"),
            "Band": day.get("forecastBand"),
            "Summary": day.get("forecastSummary"),
            "Date": extract_date(day.get("forecastSummary", "")),
            "nO2Band": day.get("nO2Band"),
            "o3Band": day.get("o3Band"),
            "pM10Band": day.get("pM10Band"),
            "pM25Band": day.get("pM25Band"),
            "sO2Band": day.get("sO2Band"),
            "Text": day.get("forecastText"),
        })

    return pl.DataFrame(processed_forecasts)

df = transform_forecast(forecast_data)


In [4]:
import psycopg2

conn = psycopg2.connect(
    dbname="air_quality_db",
    user="postgres",
    password=db_password,
    host="localhost",
    port="5432"
)

cursor = conn.cursor()

In [5]:
cursor.execute("SELECT ID FROM air_quality;")
print(cursor.fetchall())

[(47943,), (47947,)]


In [6]:
def insert_data(df):
    for row in df.iter_rows(named=True):
        cursor.execute("""
            INSERT INTO air_quality (ID, Type, Band, Summary, Date, nO2Band, o3Band, pM10Band, pM25Band, sO2Band, Text)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (ID) DO NOTHING;  -- Prevent duplicate inserts
        """, (
            row["ID"], row["Type"], row["Band"], row["Summary"],
            row["Date"], row["nO2Band"], row["o3Band"],
            row["pM10Band"], row["pM25Band"], row["sO2Band"], row["Text"]
        ))

    conn.commit()

In [7]:
# Call the function with your DataFrame
insert_data(df)

# Close connection
cursor.close()
conn.close()