# Pull and Push OpenAQ data

Pull air quality measurements from OpenAQ and push to BigQuery.

-  Pulls latest hourly data for a list of predetermined sensors.

-  Handles logging & job tracking.

-  Writes directly from Pandas DataFrame to BigQuery.

-  Finalizes job tracking.

In [None]:
# --- Setup & Config ---

import os
import sys
import json
import time
import uuid
import logging
from dotenv import load_dotenv
from datetime import datetime, timezone
import pandas as pd
import requests
from google.cloud import bigquery

# --- OpenAQ API config ---
OPENAQ_API_BASE = "https://api.openaq.org/v3"
SENSOR_IDS = [1234, 5678, ...]  # <-- replace with your 59 sensor IDs

# --- Logging config ---
LOGS_DIR = "logs"
os.makedirs(LOGS_DIR, exist_ok=True)
log_filename = os.path.join(LOGS_DIR, f"job_log_{datetime.now().strftime('%Y%m%d')}.log")

logging.basicConfig(
    filename=log_filename,
    filemode="a",
    format="%(asctime)s [%(levelname)s] %(message)s",
    level=logging.INFO
)

In [10]:
# --- BigQuery constants and connect to client ---

# BQ Project
PROJECT_ID = "openaq-data-pipeline-468404"

# BQ Data storage
DATASET_ID_DATA = "openaq_pm25"
TABLE_ID_DATA = "pm25_hourly"

# BQ Job tracking
DATASET_ID_JOBS = "openaq_jobs"
TABLE_ID_JOBS = "job_tracking"

# Load environment variables (e.g., GOOGLE_APPLICATION_CREDENTIALS)
load_dotenv()

# Confirm key path
print("Google Credentials Path:", os.getenv("GOOGLE_APPLICATION_CREDENTIALS"))

# Connect to BigQuery project
bq_client = bigquery.Client(project=BQ_PROJECT_ID)

Google Credentials Path: C:\\Users\\camer\\.gcp_keys\\openaq_data_loader.json


In [None]:
# --- Job config ---

# Make UTC-aware timestamps
JOB_START_TIME = datetime.now(timezone.utc)

# Create dataset if not exists
dataset_ref = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID_JOBS}")
try:
    bq_client.get_dataset(dataset_ref)
except Exception:
    print(f"Creating dataset {DATASET_ID_JOBS}...")
    bq_client.create_dataset(dataset_ref)

# Create table if not exists
schema = [
    bigquery.SchemaField("job_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("job_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("start_time", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("end_time", "TIMESTAMP"),
    bigquery.SchemaField("status", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("notes", "STRING"),
    bigquery.SchemaField("error_message", "STRING"),
    bigquery.SchemaField("rows_inserted", "INTEGER"),
    bigquery.SchemaField("source_start_time", "TIMESTAMP"),
    bigquery.SchemaField("source_end_time", "TIMESTAMP")
]

table_ref = f"{PROJECT_ID}.{DATASET_ID_JOBS}.{TABLE_ID_JOBS}"
try:
    bq_client.get_table(table_ref)
except Exception:
    print(f"Creating table {TABLE_ID_JOBS}...", flush=True)
    table = bigquery.Table(table_ref, schema=schema)
    bq_client.create_table(table)

# Generate a unique job_id
job_id = f"job_{JOB_START_TIME.strftime('%Y%m%d_%H%M%S')}"

# Insert start record
rows_to_insert = [{
    "job_id": job_id,
    "job_name": "OpenAQ_PM25_Pipeline",
    "start_time": JOB_START_TIME.isoformat(),
    "status": "RUNNING",
    "notes": "Job started successfully.",
    "error_message": None,
    "rows_inserted": None,
    "source_start_time": None,
    "source_end_time": None
}]

errors = bq_client.insert_rows_json(table_ref, rows_to_insert)

if errors:
    print("⚠️ Errors inserting job start record:", errors, flush=True)
else:
    print(f"✅ Job tracking started: {job_id}", flush=True)

Creating table job_tracking...
✅ Job tracking started: job_20250808_221854


In [None]:
# quickly peek at the contents of your BigQuery job tracking table

# query job table
query = f"""
SELECT *
FROM `{PROJECT_ID}.{DATASET_ID_JOBS}.{TABLE_ID_JOBS}`
ORDER BY start_time DESC
LIMIT 10
"""
query_job = bq_client.query(query)
results = query_job.result()

# Convert list of Row objects to list of dicts
rows_as_dicts = [dict(results.items()) for results in results]

# Create DataFrame
df_jobs = pd.DataFrame(rows_as_dicts)

# Display the dataframe
print(df_jobs)

                job_id              job_name                       start_time  \
0  job_20250808_221854  OpenAQ_PM25_Pipeline 2025-08-08 22:18:54.987150+00:00   

  end_time   status                      notes error_message rows_inserted  \
0     None  RUNNING  Job started successfully.          None          None   

  source_start_time source_end_time  
0              None            None  


In [19]:
# delete your existing BigQuery table so you can recreate it with the updated schema

# table_ref = f"{PROJECT_ID}.{DATASET_ID_JOBS}.{TABLE_ID_JOBS}"

# try:
#     bq_client.delete_table(table_ref)
#     print(f"Deleted table {table_ref}")
# except Exception as e:
#     print(f"Error deleting table: {e}")