# Environment Setting Up

In [1]:
import os
from dotenv import load_dotenv

# Loading environment variables from .env
load_dotenv()

# Changing directory to main directory for easy data access
working_directory = os.getenv("WORKING_DIRECTORY")
os.chdir(working_directory)

# Checking the change
%pwd

'/workspaces/Live-Air-Quality'

In [2]:
from pathlib import Path

# Checking the change
print("Git folder exists:", Path(".git").exists())

Git folder exists: True


# 1. Location IDs

In [3]:
import os
import time

from openaq import OpenAQ
from pathlib import Path
from dotenv import load_dotenv

from AQI.utils.logger import get_logger
from AQI.utils.common import create_directories, save_json

# Load environment variables from .env file
load_dotenv()

# Initializing the logger to test for exploration purposes
logger = get_logger("test")


def get_location_ids(geo_grid: list[float], save_path: Path, search_limit: int = 100, retries: int = 3, delay: float = 2.0) -> dict[int, str]:
    """
    Fetch location IDs from OpenAQ within a geo-grid and save to JSON.

    Args:
    - geo_grid (list[float]): Bounding box [lon1, lat1, lon2, lat2].
    - save_path (Path): File path for saving results as JSON.
    - search_limit (int, optional): Max number of locations to fetch. Default is 100.
    - retries (int, optional): Number of retries if API call fails. Default is 3.
    - delay (float, optional): Delay (in seconds) between retries. Default is 2.0.

    Returns:
    - dict: {location_id: location_name}
    """
    attempt = 0
    while attempt < retries:
        try:
            # Step 1: Initialize an authenticated OpenAQ client
            client = setup_openaq()

            # Step 2: Fetch sensor locations within bounding box
            data = client.locations.list(bbox=geo_grid, limit=search_limit)
            logger.info(f"Fetched up to {search_limit} locations within {geo_grid}.")

            # Step 3: Transform results into a dictionary {id: name}
            loc_info = {location.id: location.name for location in data.results}
            logger.info(f"Retrieved {len(loc_info)} sensor locations.")

            # Step 4: Verify integrity (paranoia check — shouldn't mismatch)
            if len(data.results) != len(loc_info):
                raise ValueError("Mismatch between results and constructed dictionary.")

            # Step 5: Save the dictionary to disk as JSON
            create_directories([save_path.parent])          # ensure parent directory exists
            save_json(save_path=save_path, data=loc_info)

            return loc_info
        
        except ValueError as ve:
            # Data integrity issue, no retry here
            logger.error(f"Mismatch between results and constructed dictionary.: {ve}")
            raise ve

        except Exception as e:
            # Error, retry
            attempt += 1
            logger.warning(f"While obtaining location ids, attempt {attempt}/{retries} failed with error: {e}. Retrying in {delay}s...")
            if attempt >= retries:
                logger.error(f"Max retries reached for obtaining location ids. Raising exception.")
                raise e
            time.sleep(delay)


def setup_openaq(secret_env_var: str = "OPENAQI_API_KEY") -> OpenAQ:
    """
    Load the OpenAQ API key from environment and return an authenticated OpenAQ client.

    Args:
    - secret_env_var (str, optional): Name of the environment variable holding the API key.

    Returns:
    - OpenAQ: An authenticated OpenAQ client object.

    Raises:
    - ValueError: If the API key is not found in the environment.
    - Exception: If client creation fails for any other reason.
    """
    # Load API key from environment variable
    openaq_api = os.getenv(secret_env_var)

    if openaq_api is None:
        logger.error("Unable to find OpenAQ API key in environment.")
        raise ValueError(f"{secret_env_var} not found or empty.")
    
    try:
        # Instantiate API client
        client = OpenAQ(api_key=openaq_api)
        logger.info(f"Successfully initialized OpenAQ client.")
        return client
    
    except Exception as e:
        logger.error(f"Unexpected error while creating OpenAQ client: {e}")
        raise e

In [4]:
save_path = Path('artifacts/data/sensor_locations.json')
geo_grid = [-74.25909, 40.477399, -73.700181, 40.917577]

if __name__ == "__main__":
    try:
        get_location_ids(geo_grid=geo_grid, save_path=save_path, search_limit=100)
    except:
        raise

[2025-09-02 12:50:17,146: INFO: 2151408928: Successfully initialized OpenAQ client.]
[2025-09-02 12:50:18,472: INFO: 2151408928: Fetched up to 100 locations within [-74.25909, 40.477399, -73.700181, 40.917577].]
[2025-09-02 12:50:18,473: INFO: 2151408928: Retrieved 52 sensor locations.]
[2025-09-02 12:50:18,475: INFO: common: Directory: artifacts/data created successfully.]
[2025-09-02 12:50:18,476: INFO: common: Directory: artifacts/data created successfully.]
[2025-09-02 12:50:18,478: INFO: common: JSON file saved at: artifacts/data/sensor_locations.json]


# 2. Database Manager

In [4]:
from duckdb import DuckDBPyConnection
from pathlib import Path
from AQI.utils.common import create_directories

import os
import duckdb as ddb

def database_connect(db_path: Path, s3_config: dict | None = None) -> DuckDBPyConnection:
    """
    Connect to the DuckDB database at the specified path, and configures S3 access credentials for external data sources.

    Args:
        db_path (Path): Path to the DuckDB database file.
        s3_config (dict | None): AWS S3 credentials. Defaults to None.

    Returns:
        DuckDBPyConnection: Active database connection.
    """
    conn = ddb.connect(str(db_path))

    if s3_config:
        conn.sql(f"SET s3_access_key_id='{s3_config['access_key']}';")
        conn.sql(f"SET s3_secret_access_key='{s3_config['secret_key']}';")
        conn.sql(f"SET s3_region='{s3_config['region']}';")
        
    return conn


def database_close(conn: DuckDBPyConnection) -> None:
    """
    Close the DuckDB database connection.

    Args:
        conn (DuckDBPyConnection): Active database connection to be closed.

    Returns:
        None
    """
    if conn is not None:
        conn.close()


def database_aggregate_sql_paths(dir: Path) -> list[Path]:
    """
    Recursively collect all `.sql` file paths from the specified directory.

    Args:
        dir (Path): Root directory to search for SQL script files.

    Returns:
        list[Path]: Sorted list of paths to `.sql` files.
    """
    sql_scripts = []
    
    for root, _, files in os.walk(dir):
        for file in files:
            if file.lower().endswith(".sql"):
                sql_scripts.append(Path(root) / file)
    
    return sorted(sql_scripts)


def database_load_query(query_path: Path) -> str:
    """
    Load a SQL query from a file.

    Args:
        query_path (Path): Path to the .sql file.

    Returns:
        str: The SQL query as a string.
    """
    with open(query_path, "r", encoding="utf-8") as file:
        return file.read()


def database_execute_sql_query(conn: DuckDBPyConnection, query: str) -> None:
    """
    Execute a SQL query on an active DuckDB connection.

    Args:
        conn (DuckDBPyConnection): Active DuckDB connection.
        query (str): SQL query to execute.
    """
    conn.execute(query)


def database_initialize(db_path: Path, ddl_dir: Path) -> None:
    """
    Initialize the DuckDB database schema from .sql files in the given directory.

    Args:
        db_path (Path): Path to the DuckDB database file.
        ddl_dir (Path): Directory containing .sql files for schema creation.
    """
    create_directories([db_path.parent])

    query_paths = database_aggregate_sql_paths(dir=ddl_dir)
    conn = database_connect(db_path=db_path)

    try:
        for query_path in query_paths:
            query = database_load_query(query_path=query_path)
            database_execute_sql_query(conn=conn, query=query)
    finally:
        # Making sure connection is always closed
        database_close(conn=conn)


def database_drop(db_path: Path) -> None:
    """
    Delete the DuckDB database file at the specified path.

    Args:
        db_path (Path): Path to the DuckDB database file.

    Returns:
        None
    """
    db_path.unlink(missing_ok=True)

In [19]:
db_location = Path('artifacts/sql/air_quality.db')
ddl_location = Path('src/AQI/sql/ddl')
creation_or_deletion = True

if __name__ == "__main__":
    try:
        if creation_or_deletion:
            database_initialize(db_path=db_location, ddl_dir=ddl_location)
        else:
            database_drop(db_path=db_location)
    except:
        raise

[2025-09-02 13:37:23,356: INFO: common: Directory: artifacts/sql created successfully.]


# 3. Data Extraction

In [5]:
from datetime import datetime
from dateutil.relativedelta import relativedelta
from jinja2 import Template
from pathlib import Path

from AQI.utils.common import load_json
from AQI.utils.logger import get_logger

# Initializing the logger to test for exploration purposes
logger = get_logger("test")

def insert_api_data(location_path: Path, db_path: Path, query_path: Path, start_date: str | None, end_date: str | None, file_name: str = "OpenAQ") -> None:
    """Insert OpenAQ data into database over a date range."""
    # Step 1: Get location IDs
    location_ids = extract_location_ids(file_path=location_path)

    # Step 2: Connect to DB
    conn = database_connect(db_path=db_path)

    # Step 3: Tracking passes and fails
    passed, failed = 0, 0

    try:
        # Step 4: Load SQL template
        query_template  = database_load_query(query_path)

        # Step 5: Parse dates
        start = parse_date(start_date)
        end = parse_date(end_date, default=datetime.now())

        # Step 6: Generate date range
        date_range = generate_range(start, end)

        # Step 7: Loop over IDs and months
        for location_id in location_ids:
            for curr_date in date_range:
                api_path = render_openaq_path(location_id, year=str(curr_date.year), month=str(curr_date.month))
                extraction_query = render_query(query_template, api_path, file_name)
                try:
                    database_execute_sql_query(conn=conn, query=extraction_query)
                    passed += 1
                except Exception as e:
                    logger.warning(f"Failed: id={location_id}, date={curr_date:%Y-%m}, error={e}")
                    failed += 1

                logger.info(f"{passed} / {passed + failed} ingested out of {len(location_ids)}")
    
    finally:
        # Step 8: Close the database regardless of exceptions
        database_close(conn=conn)


def extract_location_ids(file_path: Path) -> list[str]:
    """Extract location IDs as strings from a JSON file."""
    locations = load_json(file_path)
    return [str(id) for id in locations.keys()]


def parse_date(date_str: str | None, default: datetime | None = None) -> datetime:
    """Parse YYYY-MM string into datetime, fallback to default or now()."""
    if date_str:
        return datetime.strptime(date_str, "%Y-%m")
    return default or datetime.now()


def generate_range(start: datetime, end: datetime) -> list[datetime]:
    """Return list of month starts between start and end (inclusive)."""
    return [start + relativedelta(months=i) for i in range((end.year - start.year) * 12 + (end.month - start.month + 1))]


def render_query(query: str, api_path: str, file_name: str) -> str:
    """Render SQL query with template substitution."""
    return Template(query).render(data_file_path=api_path, file_name=file_name)


def render_openaq_path(location_id: str, year: str, month: str) -> str:
    """Generate OpenAQ S3 path for a location and month."""
    return f"s3://openaq-data-archive/records/csv.gz/locationid={location_id}/year={year}/month={month.zfill(2)}/*.csv.gz"

In [8]:
if __name__ == '__main__':
    try:
        location_path = Path("artifacts/data/sensor_locations.json")
        db_path = Path("artifacts/sql/air_quality.db")
        query_path = Path("src/AQI/sql/dml/010_insert_measurements.sql")

        start_date = "2025-01"
        end_date = "2025-01"

        insert_api_data(location_path, db_path, query_path, start_date, end_date)
    except:
        raise

[2025-09-02 13:48:35,009: INFO: common: JSON file succesfully loaded form: artifacts/data/sensor_locations.json]
Failed: id=386, date=2025-01, error=IO Error: No files found that match the pattern "s3://openaq-data-archive/records/csv.gz/locationid=386/year=2025/month=01/*.csv.gz"
Failed: id=642, date=2025-01, error=IO Error: No files found that match the pattern "s3://openaq-data-archive/records/csv.gz/locationid=642/year=2025/month=01/*.csv.gz"
Failed: id=662, date=2025-01, error=IO Error: No files found that match the pattern "s3://openaq-data-archive/records/csv.gz/locationid=662/year=2025/month=01/*.csv.gz"
Failed: id=853, date=2025-01, error=IO Error: No files found that match the pattern "s3://openaq-data-archive/records/csv.gz/locationid=853/year=2025/month=01/*.csv.gz"
Failed: id=974, date=2025-01, error=IO Error: No files found that match the pattern "s3://openaq-data-archive/records/csv.gz/locationid=974/year=2025/month=01/*.csv.gz"
Failed: id=984, date=2025-01, error=IO Err

# 4. Presentations

In [6]:
from pathlib import Path


def create_presentations(db_path: Path, presentation_dir: Path) -> None:
    # Step 1: Connect to DB
    conn = database_connect(db_path=db_path)

    # Step 2: Aggregate all .sql file paths
    query_paths = database_aggregate_sql_paths(dir=presentation_dir)

    try:
        # Step 3: Execute all the queries
        for query_path in query_paths:
            query = database_load_query(query_path=query_path)
            database_execute_sql_query(conn=conn, query=query)
    finally:
        # Step 4: Making sure connection is always closed
        database_close(conn=conn)

In [7]:
if __name__ == '__main__':
    try:
        db_path = Path("artifacts/sql/air_quality.db")
        presentation_dir = Path("src/AQI/sql/presentation")

        create_presentations(db_path, presentation_dir)
    except:
        raise