<a href="https://colab.research.google.com/github/ethandlouiee/MGMT467_Team11/blob/main/team/Final_Project/pipeline/MGMT467_FinalProject_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **1 - Batch Ingestion:**

In [2]:
# @title Step 1.1: Setup & Authentication
# Install required libraries for Google Cloud and Kaggle
!pip install -q kaggle google-cloud-bigquery google-cloud-storage pandas db-dtypes

# Authenticate User for Google Cloud access
# This will trigger a popup to allow access to your GCP resources
from google.colab import auth
auth.authenticate_user()

print("Libraries installed and Google Cloud successfully authenticated!")

Libraries installed and Google Cloud successfully authenticated!


In [3]:
# @title Step 1.2: Configuration & Kaggle JSON Upload
import os
import json
from google.colab import files
from google.cloud import bigquery
from google.cloud import storage

# --- Google Cloud Config ---
# User Input for Project ID
project_id = "mgmt467-fp-11" # @param {type:"string"}
region = "us-central1" # @param {type:"string"}

# Define Resource Names
bucket_name = f"air_quality_raw_{project_id}" # Unique bucket name
dataset_name = "air_quality_dataset"
table_name = "sensor_data"

# Set the environment variable for the project
os.environ["GOOGLE_CLOUD_PROJECT"] = project_id

# Initialize Clients
bq_client = bigquery.Client(project=project_id)
storage_client = storage.Client(project=project_id)

print(f"Configuration set for Project: {project_id}")
print(f"Target Bucket: {bucket_name}")
print("-------------------------------------------------")

# --- Kaggle Authentication ---
print("Please upload your kaggle.json file now...")
uploaded = files.upload()

# Check if kaggle.json was uploaded
if 'kaggle.json' in uploaded:
    # Create the .kaggle directory if it doesn't exist
    !mkdir -p ~/.kaggle

    # Move the uploaded file to the .kaggle directory
    !mv kaggle.json ~/.kaggle/

    # Change permissions to ensure the file is secure (required by Kaggle API)
    !chmod 600 ~/.kaggle/kaggle.json

    print("\nSUCCESS: kaggle.json uploaded and permissions set.")
else:
    print("\nERROR: kaggle.json not found. Please run the cell again and upload the correct file.")

Configuration set for Project: mgmt467-fp-11
Target Bucket: air_quality_raw_mgmt467-fp-11
-------------------------------------------------
Please upload your kaggle.json file now...


Saving kaggle.json to kaggle.json

SUCCESS: kaggle.json uploaded and permissions set.


In [4]:
# @title Step 1.3: Ingest from Kaggle to GCS (Raw Data Lake)
from google.cloud import storage
import os

# 1. Download Dataset from Kaggle
print("Downloading data from Kaggle...")
!kaggle datasets download -d fedesoriano/air-quality-data-set

# 2. Unzip the file
print("Unzipping data...")
!unzip -o air-quality-data-set.zip

# Find the .csv file name (it varies sometimes)
files = [f for f in os.listdir('.') if f.endswith('.csv')]
if not files:
    raise ValueError("No CSV file found in the downloaded dataset!")
source_file_name = files[0]
print(f"Found raw file: {source_file_name}")

# 3. Create GCS Bucket (if it doesn't exist)
bucket = storage_client.bucket(bucket_name)
if not bucket.exists():
    print(f"Creating bucket {bucket_name}...")
    bucket = storage_client.create_bucket(bucket_name, location=region)
else:
    print(f"Bucket {bucket_name} already exists.")

# 4. Upload File to GCS
blob_name = source_file_name # Keep the same name in GCS
blob = bucket.blob(blob_name)

print(f"Uploading {source_file_name} to gs://{bucket_name}/{blob_name}...")
blob.upload_from_filename(source_file_name)

print("Success! Raw data is now stored in Google Cloud Storage.")


Downloading data from Kaggle...
Dataset URL: https://www.kaggle.com/datasets/fedesoriano/air-quality-data-set
License(s): copyright-authors
Downloading air-quality-data-set.zip to /content
  0% 0.00/248k [00:00<?, ?B/s]
100% 248k/248k [00:00<00:00, 568MB/s]
Unzipping data...
Archive:  air-quality-data-set.zip
  inflating: AirQuality.csv          
Found raw file: AirQuality.csv
Bucket air_quality_raw_mgmt467-fp-11 already exists.
Uploading AirQuality.csv to gs://air_quality_raw_mgmt467-fp-11/AirQuality.csv...
Success! Raw data is now stored in Google Cloud Storage.


In [5]:
# @title Step 1.4: Curated Load to BigQuery (Schema & Partitioning)
import pandas as pd
import re
from google.cloud import bigquery

# 1. Read Raw Data from GCS
source_uri = f"gs://{bucket_name}/{source_file_name}"
print(f"Reading raw data from {source_uri}...")
df = pd.read_csv(source_uri, sep=';', decimal=',')

# 2. Data Curation
# Drop empty columns (artifacts)
df = df.dropna(axis=1, how='all')

# Fix Date Format for Partitioning (DD/MM/YYYY -> YYYY-MM-DD)
df['Date'] = pd.to_datetime(df['Date'], format='%d/%m/%Y').dt.date

# --- FIX: Robust Column Renaming ---
# Function to clean column names for BigQuery (only Alphanumeric and _)
def clean_col_name(name):
    # Replace non-alphanumeric characters (like . or () ) with _
    clean = re.sub(r'[^a-zA-Z0-9]', '_', name)
    return clean

# Apply cleaning
df.columns = [clean_col_name(c) for c in df.columns]

print("Cleaned Column Names:", df.columns.tolist())

# 3. Define BigQuery Schema & Partitioning
# We map the specific clean names to types
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("Date", "DATE"),
        bigquery.SchemaField("Time", "STRING"),
        # We explicitly map the cleaned names:
        bigquery.SchemaField("CO_GT_", "FLOAT"),
        bigquery.SchemaField("PT08_S1_CO_", "FLOAT"),
        bigquery.SchemaField("NMHC_GT_", "FLOAT"),
        bigquery.SchemaField("C6H6_GT_", "FLOAT"),
        bigquery.SchemaField("PT08_S2_NMHC_", "FLOAT"),
        bigquery.SchemaField("NOx_GT_", "FLOAT"),
        bigquery.SchemaField("PT08_S3_NOx_", "FLOAT"),
        bigquery.SchemaField("NO2_GT_", "FLOAT"),
        bigquery.SchemaField("PT08_S4_NO2_", "FLOAT"),
        bigquery.SchemaField("PT08_S5_O3_", "FLOAT"),
        bigquery.SchemaField("T", "FLOAT"),
        bigquery.SchemaField("RH", "FLOAT"),
        bigquery.SchemaField("AH", "FLOAT"),
    ],
    # --- PARTITIONING ---
    time_partitioning=bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="Date"
    ),
    write_disposition="WRITE_TRUNCATE",
)

# 4. Load to BigQuery
dataset_ref = bq_client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)

# Ensure dataset exists
try:
    bq_client.get_dataset(dataset_ref)
except:
    bq_client.create_dataset(dataset_ref)

print(f"Loading data into {project_id}.{dataset_name}.{table_name}...")
job = bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()

print("Success! Data loaded with Cleaned Schema and Partitioning.")


Reading raw data from gs://air_quality_raw_mgmt467-fp-11/AirQuality.csv...
Cleaned Column Names: ['Date', 'Time', 'CO_GT_', 'PT08_S1_CO_', 'NMHC_GT_', 'C6H6_GT_', 'PT08_S2_NMHC_', 'NOx_GT_', 'PT08_S3_NOx_', 'NO2_GT_', 'PT08_S4_NO2_', 'PT08_S5_O3_', 'T', 'RH', 'AH']
Loading data into mgmt467-fp-11.air_quality_dataset.sensor_data...
Success! Data loaded with Cleaned Schema and Partitioning.


In [6]:
# @title Step 1.5: Data Quality Check
from google.cloud import bigquery

# --- 1. Data Quality Check (SQL) ---
# We check for the specific error code '-200' which indicates sensor failure/missing data
dq_query = f"""
    SELECT
        COUNT(*) as total_rows,
        COUNTIF(CO_GT_ = -200) as missing_co_readings,
        ROUND((COUNTIF(CO_GT_ = -200) / COUNT(*)) * 100, 2) as percent_missing
    FROM `{project_id}.{dataset_name}.{table_name}`
"""

print("Running Data Quality Check on BigQuery...")
query_job = bq_client.query(dq_query)
results = query_job.result()

print("\n--- Data Quality Results ---")
for row in results:
    print(f"Total Rows Loaded: {row.total_rows}")
    print(f"Rows with Missing CO Data (-200): {row.missing_co_readings}")
    print(f"Data Quality Impact: {row.percent_missing}% of Carbon Monoxide readings are missing.")


Running Data Quality Check on BigQuery...

--- Data Quality Results ---
Total Rows Loaded: 9471
Rows with Missing CO Data (-200): 1683
Data Quality Impact: 17.77% of Carbon Monoxide readings are missing.


# Transformation Logic Explanation

Transformation: Date Format Conversion (DD/MM/YYYY -> YYYY-MM-DD)

Logic:
The raw CSV provided dates as strings in European format (e.g., '10/03/2004').
However, BigQuery 'Time Partitioning' strictly requires a DATE or TIMESTAMP data type.
We parsed the string into a Python Date object during the Pandas load step.

Benefit:
By transforming this column into a DATE type, we enabled BigQuery to physically
partition the storage by Day. This reduces query costs and improves performance
when filtering by specific dates, as the engine only scans relevant partitions
rather than the entire table.

# **2 - Streaming Ingestion:**

In [6]:
# @title Step 2.1: Configure gcloud, Enable APIs & Create Pub/Sub
# 1. Set the project for gcloud commands
!gcloud config set project $project_id

# 2. Enable APIs
print("Enabling necessary APIs (this may take a minute)...")
!gcloud services enable \
    cloudfunctions.googleapis.com \
    run.googleapis.com \
    pubsub.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com --project=$project_id

print("APIs enabled.")

# 3. Create Pub/Sub Topic
topic_id = "openaq-topic"

# Create the topic (using quiet flag to suppress 'already exists' error cleanly)
!gcloud pubsub topics create $topic_id --project=$project_id || echo "Topic likely already exists"

print(f"\nTarget Topic: projects/{project_id}/topics/{topic_id}")


Updated property [core/project].


To take a quick anonymous survey, run:
  $ gcloud survey

Enabling necessary APIs (this may take a minute)...


Command killed by keyboard interrupt

^C
APIs enabled.
[1;31mERROR:[0m Failed to create topic [projects/mgmt467-fp-11/topics/openaq-topic]: Resource already exists in the project (resource=openaq-topic).
[1;31mERROR:[0m (gcloud.pubsub.topics.create) Failed to create the following: [openaq-topic].
Topic likely already exists

Target Topic: projects/mgmt467-fp-11/topics/openaq-topic


In [7]:
# @title Step 2.2: Create Streaming Table & BigQuery Subscription
from google.cloud import bigquery

# --- 1. Create BigQuery Streaming Table ---
streaming_table_name = "streaming_air_quality"
table_ref = bq_client.dataset(dataset_name).table(streaming_table_name)

# Define Schema (Data fields only)
schema = [
    bigquery.SchemaField("timestamp", "TIMESTAMP"),
    bigquery.SchemaField("city", "STRING"),
    bigquery.SchemaField("parameter", "STRING"),
    bigquery.SchemaField("value", "FLOAT"),
    bigquery.SchemaField("unit", "STRING"),
]

table = bigquery.Table(table_ref, schema=schema)

# Create table if it doesn't exist
try:
    bq_client.get_table(table_ref)
    print(f"Table {streaming_table_name} already exists.")
except:
    print(f"Creating table {streaming_table_name}...")
    bq_client.create_table(table)

# --- 2. Grant Permissions ---
print("Retrieving Project Number...")
project_number_list = !gcloud projects list --filter="project_id:$project_id" --format="value(projectNumber)"
project_number = project_number_list[0]
service_account = f"service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com"

# Grant permission
!gcloud projects add-iam-policy-binding $project_id \
    --member="serviceAccount:$service_account" \
    --role="roles/bigquery.dataEditor" > /dev/null

# --- 3. Create Subscription (Data Only) ---
subscription_id = "openaq-to-bq"
full_table_id = f"{project_id}:{dataset_name}.{streaming_table_name}"

print(f"Creating Subscription {subscription_id}...")

# We removed '--write-metadata' so it matches our table schema perfectly
!gcloud pubsub subscriptions create $subscription_id \
    --topic=$topic_id \
    --bigquery-table=$full_table_id \
    --use-table-schema \
    --project=$project_id || echo "Subscription likely already exists."

print(f"\nPipeline Ready: Topic -> Subscription -> Table ({streaming_table_name})")

Table streaming_air_quality already exists.
Retrieving Project Number...
Updated IAM policy for project [mgmt467-fp-11].
Creating Subscription openaq-to-bq...
[1;31mERROR:[0m Failed to create subscription [projects/mgmt467-fp-11/subscriptions/openaq-to-bq]: Resource already exists in the project (resource=openaq-to-bq).
[1;31mERROR:[0m (gcloud.pubsub.subscriptions.create) Failed to create the following: [openaq-to-bq].
Subscription likely already exists.

Pipeline Ready: Topic -> Subscription -> Table (streaming_air_quality)


In [9]:
# @title Step 2.3: Deploy Open-Meteo Cloud Function (Rome, Italy)
import os

# 1. Create Directory
os.makedirs("openaq_function", exist_ok=True)

# 2. Write Function Code (Open-Meteo)
main_py_content = """import functions_framework
import json
import requests
import os
import datetime
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
PROJECT_ID = os.environ.get("GCP_PROJECT")
TOPIC_ID = "openaq-topic"
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)

@functions_framework.http
def fetch_openaq(request):
    \"\"\"
    Fetches live Air Quality data for Rome, Italy from Open-Meteo.
    \"\"\"
    # Open-Meteo Air Quality API
    url = "https://air-quality-api.open-meteo.com/v1/air-quality"

    # Coordinates for Rome, Italy
    params = {
        "latitude": 41.9028,
        "longitude": 12.4964,
        "current": "carbon_monoxide,nitrogen_dioxide,ozone",
        "timezone": "Europe/Rome"
    }

    try:
        response = requests.get(url, params=params)
        if response.status_code != 200:
            return f"API Error: {response.status_code}", 500

        data = response.json()
        current = data.get("current", {})
        current_units = data.get("current_units", {})

        # Map Open-Meteo names to our standard names
        # carbon_monoxide -> co
        # nitrogen_dioxide -> no2
        # ozone -> o3
        mapping = {
            "carbon_monoxide": "co",
            "nitrogen_dioxide": "no2",
            "ozone": "o3"
        }

        count = 0

        # Loop through the 3 gases we requested
        for api_name, standard_name in mapping.items():
            val = current.get(api_name)
            unit = current_units.get(api_name, "unknown")

            if val is not None:
                payload = {
                    # Use current UTC time or the time provided by API
                    "timestamp": current.get("time") + ":00", # Add seconds for BQ Timestamp format
                    "city": "Rome",
                    "parameter": standard_name,
                    "value": float(val),
                    "unit": unit
                }

                # Publish
                data_str = json.dumps(payload)
                publisher.publish(topic_path, data_str.encode("utf-8"))
                count += 1

        return f"Success: Published {count} live readings from Rome.", 200

    except Exception as e:
        return f"Error: {str(e)}", 500
"""

with open("openaq_function/main.py", "w") as f:
    f.write(main_py_content)

# 3. Write Requirements
with open("openaq_function/requirements.txt", "w") as f:
    f.write("functions-framework==3.*\ngoogle-cloud-pubsub\nrequests\n")

# 4. Deploy
print("Deploying Open-Meteo Function (Rome)...")
!gcloud functions deploy openaq-ingest \
    --gen2 \
    --runtime=python310 \
    --region=$region \
    --source=./openaq_function \
    --entry-point=fetch_openaq \
    --trigger-http \
    --allow-unauthenticated \
    --set-env-vars=GCP_PROJECT=$project_id

Deploying Open-Meteo Function (Rome)...
You can view your function in the Cloud Console here: https://console.cloud.google.com/functions/details/us-central1/openaq-ingest?project=mgmt467-fp-11

buildConfig:
  automaticUpdatePolicy: {}
  build: projects/906780884112/locations/us-central1/builds/70b47859-e555-48f7-8165-dfedcd0cd4a2
  dockerRegistry: ARTIFACT_REGISTRY
  dockerRepository: projects/mgmt467-fp-11/locations/us-central1/repositories/gcf-artifacts
  entryPoint: fetch_openaq
  runtime: python310
  serviceAccount: projects/mgmt467-fp-11/serviceAccounts/906780884112-compute@developer.gserviceaccount.com
  source:
    storageSource:
      bucket: gcf-v2-sources-906780884112-us-central1
      generation: '1765226112271518'
      object: openaq-ingest/function-source.zip
  sourceProvenance:
    resolvedStorageSource:
      bucket: gcf-v2-sources-906780884112-us-central1
      generation: '1765226112271518'
      object: openaq-ingest/function-source.zip
createTime: '2025-12-08T20:35:

In [21]:
# @title Step 2.4: Trigger Ingest & Validate Streaming Data
import requests
import time
from google.cloud import bigquery

# 1. Trigger the Cloud Function
# We retrieve the URL dynamically from gcloud to make it robust
function_url = !gcloud functions describe openaq-ingest --gen2 --region=$region --format="value(url)"
function_url = function_url[0]

print(f"Triggering Cloud Function at: {function_url}")
response = requests.get(function_url)

if response.status_code == 200:
    print(f"Function Response: {response.text}")
else:
    print(f"Error triggering function: {response.text}")

# 2. Wait for Ingestion
print("Waiting 15 seconds for data to flow from Pub/Sub to BigQuery...")
time.sleep(15)

# 3. Validate Data in BigQuery
print("Querying BigQuery for latest streaming records...")

query = f"""
    SELECT timestamp, city, parameter, value, unit
    FROM `{project_id}.{dataset_name}.{streaming_table_name}`
    ORDER BY timestamp DESC
    LIMIT 10
"""

query_job = bq_client.query(query)
results = list(query_job.result())

if results:
    print(f"\nSUCCESS: Found {len(results)} records.")
    print("-" * 60)
    print(f"{'TIMESTAMP':<25} | {'CITY':<20} | {'PARAM':<10} | {'VALUE'}")
    print("-" * 60)
    for row in results:
        print(f"{str(row.timestamp):<25} | {row.city[:20]:<20} | {row.parameter:<10} | {row.value}")
else:
    print("\nWARNING: No rows found yet. The pipeline might need a few more seconds.")
    print("Try running this cell again in 1 minute.")


Triggering Cloud Function at: https://us-central1-mgmt467-fp-11.cloudfunctions.net/openaq-ingest
Function Response: Success: Published 3 live readings from Rome.
Waiting 15 seconds for data to flow from Pub/Sub to BigQuery...
Querying BigQuery for latest streaming records...

SUCCESS: Found 10 records.
------------------------------------------------------------
TIMESTAMP                 | CITY                 | PARAM      | VALUE
------------------------------------------------------------
2025-12-08 22:00:00+00:00 | Rome                 | no2        | 51.4
2025-12-08 22:00:00+00:00 | Rome                 | no2        | 51.4
2025-12-08 22:00:00+00:00 | Rome                 | co         | 683.0
2025-12-08 22:00:00+00:00 | Rome                 | co         | 683.0
2025-12-08 22:00:00+00:00 | Rome                 | co         | 683.0
2025-12-08 22:00:00+00:00 | Rome                 | no2        | 51.4
2025-12-08 22:00:00+00:00 | Rome                 | o3         | 1.0
2025-12-08 22:00:00

# **3 - Analytics and Modeling:**

In [9]:
# @title Step 3.0 (Fixed): Feature Engineering (Rush Hour from Time Column)
from google.cloud import bigquery

# We create a VIEW that calculates new features on the fly.
# We fix the error by extracting HOUR from the 'Time' column, not the 'Date' column.

fe_view_query = f"""
    CREATE OR REPLACE VIEW `{project_id}.{dataset_name}.engineered_training_data` AS
    SELECT
        -- TARGET
        (CO_GT_ * 1000) as label,

        -- ORIGINAL FEATURES
        NO2_GT_ as no2,

        -- NEW ENGINEERED FEATURES
        -- 1. Extract Hour from the 'Time' string (Format is HH.MM.SS)
        CASE
            WHEN EXTRACT(HOUR FROM PARSE_TIME('%H.%M.%S', Time)) BETWEEN 7 AND 9 THEN 1
            WHEN EXTRACT(HOUR FROM PARSE_TIME('%H.%M.%S', Time)) BETWEEN 17 AND 19 THEN 1
            ELSE 0
        END as is_rush_hour,

        -- 2. Is it the Weekend? (Works on Date column: 1=Sun, 7=Sat)
        CASE
            WHEN EXTRACT(DAYOFWEEK FROM Date) IN (1, 7) THEN 1
            ELSE 0
        END as is_weekend

    FROM
        `{project_id}.{dataset_name}.{table_name}`
    WHERE
        CO_GT_ > -200
        AND NO2_GT_ > -200
"""

print(f"Creating Feature Engineering View in {dataset_name}...")
bq_client.query(fe_view_query).result()
print("✅ Success! View 'engineered_training_data' created with fixed Time parsing.")

Creating Feature Engineering View in air_quality_dataset...
✅ Success! View 'engineered_training_data' created with fixed Time parsing.


In [3]:
# @title Step 3.1: Train BQML Model (Unit-Corrected)
from google.cloud import bigquery

# Feature Engineering in SQL:
# 1. CO_GT_ * 1000 -> Converts mg/m^3 to µg/m^3 (Matching Open-Meteo)
# 2. NO2_GT_       -> Already µg/m^3 (Matching Open-Meteo)
# 3. Removing O3   -> Removing the incompatible feature

train_query = f"""
    CREATE OR REPLACE MODEL `{project_id}.{dataset_name}.air_quality_model_v2`
    OPTIONS(model_type='LINEAR_REG') AS
    SELECT
        (CO_GT_ * 1000) as label,   -- TARGET: Converted to Micrograms
        NO2_GT_ as no2              -- FEATURE: Nitrogen Dioxide (True Concentration)
    FROM
        `{project_id}.{dataset_name}.{table_name}`
    WHERE
        CO_GT_ > -200
        AND NO2_GT_ > -200
"""

print(f"Training Engineered Model (v2) in {project_id}.{dataset_name}...")
job = bq_client.query(train_query)
job.result()

print("SUCCESS: Unit-Corrected Model trained.")

Training Engineered Model (v2) in mgmt467-fp-11.air_quality_dataset...
SUCCESS: Unit-Corrected Model trained.


In [10]:
# @title Step 3.12: Train Model with Feature Engineering
train_fe_query = f"""
    CREATE OR REPLACE MODEL `{project_id}.{dataset_name}.air_quality_model_fe`
    OPTIONS(model_type='LINEAR_REG') AS
    SELECT
        label,
        no2,
        is_rush_hour,
        is_weekend
    FROM
        `{project_id}.{dataset_name}.engineered_training_data`
"""

print("Training Model with Feature Engineering...")
bq_client.query(train_fe_query).result()
print("✅ Advanced Model Trained!")


Training Model with Feature Engineering...
✅ Advanced Model Trained!


In [6]:
# @title Step 3.2: Evaluate Model Performance
eval_query = f"""
    SELECT *
    FROM ML.EVALUATE(MODEL `{project_id}.{dataset_name}.air_quality_model_v2`, (
        SELECT
            CO_GT_ as label,
            NO2_GT_ as no2,
            PT08_S5_O3_ as o3
        FROM
            `{project_id}.{dataset_name}.{table_name}`
        WHERE
            CO_GT_ > -200 AND NO2_GT_ > -200 AND PT08_S5_O3_ > -200
    ))
"""

print("Running Model Evaluation...")
job = bq_client.query(eval_query)
results = list(job.result())

print("\n--- Model Metrics ---")
for row in results:
    print(f"R2 Score: {row.r2_score:.4f}")
    print(f"Mean Absolute Error: {row.mean_absolute_error:.4f} mg/m^3")
    print(f"Mean Squared Error: {row.mean_squared_error:.4f}")

print("\nInterpretation: An R2 close to 1.0 is perfect. An R2 of 0 means the model is useless.")

Running Model Evaluation...

--- Model Metrics ---
R2 Score: -2764015.0710
Mean Absolute Error: 2187.6278 mg/m^3
Mean Squared Error: 5739857.5464

Interpretation: An R2 close to 1.0 is perfect. An R2 of 0 means the model is useless.


In [9]:
# @title Step 3.3 (Revised): Predict with Corrected Model
predict_query = f"""
    WITH live_features AS (
        SELECT
            timestamp,
            -- We only need NO2 now
            MAX(CASE WHEN parameter = 'no2' THEN value END) as no2,
            MAX(CASE WHEN parameter = 'co' THEN value END) as actual_co_live
        FROM `{project_id}.{dataset_name}.{streaming_table_name}`
        GROUP BY timestamp
        HAVING no2 IS NOT NULL
    )

    SELECT
        timestamp,
        ROUND(predicted_label, 2) as predicted_co_ug, -- Prediction in Micrograms
        actual_co_live,
        no2 as input_no2
    FROM
        ML.PREDICT(MODEL `{project_id}.{dataset_name}.air_quality_model_v2`,
        (SELECT * FROM live_features))
    ORDER BY timestamp DESC
    LIMIT 10
"""

print("Running Predictions using Engineered Model...")
job = bq_client.query(predict_query)
results = list(job.result())

print("\n--- Live Predictions (Corrected Scale) ---")
print(f"{'TIMESTAMP':<25} | {'PREDICTED CO (µg)':<18} | {'ACTUAL CO':<10} | {'INPUT NO2'}")
print("-" * 75)
for row in results:
    print(f"{str(row.timestamp):<25} | {row.predicted_co_ug:<18} | {row.actual_co_live:<10} | {row.input_no2}")

Running Predictions using Engineered Model...

--- Live Predictions (Corrected Scale) ---
TIMESTAMP                 | PREDICTED CO (µg)  | ACTUAL CO  | INPUT NO2
---------------------------------------------------------------------------
2025-12-09 16:00:00+00:00 | 814.07             | 300.0      | 47.1
2025-12-09 15:00:00+00:00 | 439.12             | 268.0      | 28.9
2025-12-09 14:00:00+00:00 | 354.65             | 252.0      | 24.8
2025-12-09 13:00:00+00:00 | 239.28             | 250.0      | 19.2
2025-12-09 12:00:00+00:00 | 299.03             | 277.0      | 22.1
2025-12-09 11:00:00+00:00 | 329.93             | 318.0      | 23.6
2025-12-09 10:00:00+00:00 | 406.16             | 363.0      | 27.3
2025-12-09 09:00:00+00:00 | 505.04             | 371.0      | 32.1
2025-12-09 08:00:00+00:00 | 550.37             | 396.0      | 34.3
2025-12-09 07:00:00+00:00 | 517.4              | 390.0      | 32.7


In [11]:
# @title Step 3.32: Predict using Feature Engineered Model
# We now calculate 'is_rush_hour' and 'is_weekend' dynamically from the streaming timestamps.

predict_fe_query = f"""
    WITH live_features AS (
        SELECT
            timestamp,
            -- 1. Extract the raw NO2 reading (Feature 1)
            MAX(CASE WHEN parameter = 'no2' THEN value END) as no2,

            -- Optional: Get actual CO if available for comparison
            MAX(CASE WHEN parameter = 'co' THEN value END) as actual_co_live,

            -- 2. Engineer "Rush Hour" dynamically (Feature 2)
            -- We assume the timestamp is in the correct timezone (or close enough for this demo)
            CASE
                WHEN EXTRACT(HOUR FROM timestamp) BETWEEN 7 AND 9 THEN 1
                WHEN EXTRACT(HOUR FROM timestamp) BETWEEN 17 AND 19 THEN 1
                ELSE 0
            END as is_rush_hour,

            -- 3. Engineer "Weekend" dynamically (Feature 3)
            -- BigQuery: Sunday=1, Saturday=7
            CASE
                WHEN EXTRACT(DAYOFWEEK FROM timestamp) IN (1, 7) THEN 1
                ELSE 0
            END as is_weekend

        FROM `{project_id}.{dataset_name}.{streaming_table_name}`
        GROUP BY timestamp
        HAVING no2 IS NOT NULL
    )

    SELECT
        timestamp,
        ROUND(predicted_label, 2) as predicted_co_ug, -- The Prediction
        actual_co_live as actual_co,
        no2 as input_no2,
        is_rush_hour,
        is_weekend
    FROM
        ML.PREDICT(MODEL `{project_id}.{dataset_name}.air_quality_model_fe`,
        (SELECT * FROM live_features))
    ORDER BY timestamp DESC
    LIMIT 15
"""

print("Running Predictions using Feature Engineered Model...")
job = bq_client.query(predict_fe_query)
results = list(job.result())

print("\n--- Live Feature Engineering Predictions ---")
# Header
print(f"{'TIMESTAMP':<22} | {'PRED CO(µg)':<12} | {'ACTUAL':<8} | {'NO2':<6} | {'RUSH?':<5} | {'WKND?'}")
print("-" * 85)

for row in results:
    # Handle None for actuals nicely
    actual = row.actual_co if row.actual_co is not None else "N/A"

    print(f"{str(row.timestamp)[:19]:<22} | {row.predicted_co_ug:<12} | {str(actual):<8} | {row.input_no2:<6} | {row.is_rush_hour:<5} | {row.is_weekend}")

Running Predictions using Feature Engineered Model...

--- Live Feature Engineering Predictions ---
TIMESTAMP              | PRED CO(µg)  | ACTUAL   | NO2    | RUSH? | WKND?
-------------------------------------------------------------------------------------
2025-12-09 22:00:00    | 991.12       | 642.0    | 54.0   | 0     | 0
2025-12-09 21:00:00    | 1139.46      | 657.0    | 61.8   | 0     | 0
2025-12-09 20:00:00    | 1242.15      | 615.0    | 67.2   | 0     | 0
2025-12-09 19:00:00    | 1903.43      | 536.0    | 69.9   | 1     | 0
2025-12-09 18:00:00    | 1966.19      | 416.0    | 73.2   | 1     | 0
2025-12-09 17:00:00    | 1933.86      | 383.0    | 71.5   | 1     | 0
2025-12-09 16:00:00    | 859.9        | 300.0    | 47.1   | 0     | 0
2025-12-09 15:00:00    | 513.78       | 268.0    | 28.9   | 0     | 0
2025-12-09 14:00:00    | 435.8        | 252.0    | 24.8   | 0     | 0
2025-12-09 13:00:00    | 329.3        | 250.0    | 19.2   | 0     | 0
2025-12-09 12:00:00    | 384.46       | 

In [13]:
# @title Step 3.4: The Showdown - Base Model vs. Feature Engineered Model
from google.cloud import bigquery

print("=Running Model Comparison\n")

# --- 1. Evaluate Base Model (Only NO2) ---
# Note: BQML is smart enough to ignore the extra columns in the view that the base model doesn't need.
base_eval_query = f"""
    SELECT
        r2_score,
        mean_absolute_error,
        mean_squared_error
    FROM
        ML.EVALUATE(MODEL `{project_id}.{dataset_name}.air_quality_model_v2`,
        (SELECT label, no2 FROM `{project_id}.{dataset_name}.engineered_training_data`))
"""

base_job = bq_client.query(base_eval_query)
base_results = list(base_job.result())[0]

# --- 2. Evaluate Feature Engineered Model (NO2 + Rush Hour + Weekend) ---
fe_eval_query = f"""
    SELECT
        r2_score,
        mean_absolute_error,
        mean_squared_error
    FROM
        ML.EVALUATE(MODEL `{project_id}.{dataset_name}.air_quality_model_fe`,
        (SELECT label, no2, is_rush_hour, is_weekend FROM `{project_id}.{dataset_name}.engineered_training_data`))
"""

fe_job = bq_client.query(fe_eval_query)
fe_results = list(fe_job.result())[0]

# --- 3. Print the Scorecard ---
print(f"{'METRIC':<25} | {'BASE MODEL':<15} | {'FE MODEL (NEW)':<15} | {'IMPROVEMENT'}")
print("-" * 75)

metrics = [
    ("R2 Score (Higher is better)", base_results.r2_score, fe_results.r2_score),
    ("MAE (Lower is better)", base_results.mean_absolute_error, fe_results.mean_absolute_error),
    ("MSE (Lower is better)", base_results.mean_squared_error, fe_results.mean_squared_error)
]

for name, base, fe in metrics:
    # Calculate simple percent difference
    diff = fe - base
    if "R2" in name:
        color = "✅" if diff > 0 else "❌"
    else:
        color = "✅" if diff < 0 else "❌" # For Error, negative diff is good

    print(f"{name:<25} | {base:<15.4f} | {fe:<15.4f} | {color}")

print("\n---------------------------------------------------------------------------")
if fe_results.r2_score > base_results.r2_score:
    print("The Feature Engineered Model is superior.")
    print("Analysis: Adding 'Rush Hour' and 'Weekend' flags helped explain the variance.")
else:
    print("The Feature Engineering didn't add much predictive power.")
    print("Analysis: NO2 might be such a strong predictor that time of day doesn't matter much more.")

=Running Model Comparison

METRIC                    | BASE MODEL      | FE MODEL (NEW)  | IMPROVEMENT
---------------------------------------------------------------------------
R2 Score (Higher is better) | 0.4669          | 0.5096          | ✅
MAE (Lower is better)     | 741.9736        | 722.8243        | ✅
MSE (Lower is better)     | 1133450.4257    | 1042762.7861    | ✅

---------------------------------------------------------------------------
The Feature Engineered Model is superior.
Analysis: Adding 'Rush Hour' and 'Weekend' flags helped explain the variance.


In [11]:
# @title Step 4.1: Create 'Current_Air_Quality' View
# This view pivots the latest data so Looker Studio sees simple columns:
# timestamp | co | no2 | o3

view_query = f"""
    CREATE OR REPLACE VIEW `{project_id}.{dataset_name}.current_air_quality_view` AS
    SELECT
        timestamp,
        -- Pivot rows into columns for easier Dashboarding
        MAX(CASE WHEN parameter = 'co' THEN value END) as co_level,
        MAX(CASE WHEN parameter = 'no2' THEN value END) as no2_level,
        MAX(CASE WHEN parameter = 'o3' THEN value END) as o3_level,
        MAX(CASE WHEN parameter = 'co' THEN unit END) as unit
    FROM `{project_id}.{dataset_name}.{streaming_table_name}`
    GROUP BY timestamp
    -- Only get the most recent timestamp
    ORDER BY timestamp DESC
    LIMIT 1
"""

print(f"Creating View: {project_id}.{dataset_name}.current_air_quality_view...")
job = bq_client.query(view_query)
job.result()

print("Success! View created. This will always contain only the single latest record.")


Creating View: mgmt467-fp-11.air_quality_dataset.current_air_quality_view...
Success! View created. This will always contain only the single latest record.


In [12]:
# @title Step 4.2: Create 'Recent_24h_Correlation' View
correlation_view_query = f"""
    CREATE OR REPLACE VIEW `{project_id}.{dataset_name}.recent_24h_correlation_view` AS
    SELECT
        timestamp,
        MAX(CASE WHEN parameter = 'no2' THEN value END) as no2_level,
        MAX(CASE WHEN parameter = 'o3' THEN value END) as o3_level
    FROM `{project_id}.{dataset_name}.{streaming_table_name}`
    WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
    GROUP BY timestamp
    -- Ensure we only keep rows where BOTH sensors reported data
    HAVING no2_level IS NOT NULL AND o3_level IS NOT NULL
"""

print(f"Creating View: {project_id}.{dataset_name}.recent_24h_correlation_view...")
job = bq_client.query(correlation_view_query)
job.result()

print("Success! View created. Ready for Scatter Plot visualization.")

Creating View: mgmt467-fp-11.air_quality_dataset.recent_24h_correlation_view...
Success! View created. Ready for Scatter Plot visualization.


In [15]:
# @title Step 4.3: Create 'Historical_Reliability' View
reliability_view_query = f"""
    CREATE OR REPLACE VIEW `{project_id}.{dataset_name}.historical_reliability_view` AS
    SELECT
        -- Calculate Total Rows
        COUNT(*) as total_records,

        -- Calculate Reliability % for Carbon Monoxide
        ROUND((COUNTIF(CO_GT_ > -200) / COUNT(*)) * 100, 2) as co_reliability_pct,

        -- Calculate Reliability % for Nitrogen Dioxide
        ROUND((COUNTIF(NO2_GT_ > -200) / COUNT(*)) * 100, 2) as no2_reliability_pct,

        -- Calculate Overall System Reliability (Average of the two)
        ROUND(
            (
                (COUNTIF(CO_GT_ > -200) + COUNTIF(NO2_GT_ > -200))
                / (COUNT(*) * 2)
            ) * 100,
        2) as overall_reliability_pct

    FROM `{project_id}.{dataset_name}.{table_name}`
"""

print(f"Creating View: {project_id}.{dataset_name}.historical_reliability_view...")
job = bq_client.query(reliability_view_query)
job.result()

print("Success! View created correctly.")

Creating View: mgmt467-fp-11.air_quality_dataset.historical_reliability_view...
Success! View created correctly.


In [17]:
# @title Step 4.4: Create 'Pollution_Trend_Comparison' View
# This complex query does two things:
# 1. Grabs the last 6 hours of Live Data (Rome 2025)
# 2. Joins it with the Average Baseline from 2004 (Historical)

time_series_query = f"""
    CREATE OR REPLACE VIEW `{project_id}.{dataset_name}.pollution_trend_view` AS

    WITH live_data AS (
        SELECT
            timestamp,
            MAX(CASE WHEN parameter = 'co' THEN value END) as live_co_ug,
            MAX(CASE WHEN parameter = 'no2' THEN value END) as live_no2_ug
        FROM `{project_id}.{dataset_name}.{streaming_table_name}`
        WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 6 HOUR)
        GROUP BY timestamp
    ),

    historical_baseline AS (
        -- Calculate the average CO/NO2 from 2004 to draw a "baseline" line
        -- Remember: Historical CO was mg/m3, so we * 1000 to match live units
        SELECT
            AVG(CO_GT_) * 1000 as avg_historical_co,
            AVG(NO2_GT_) as avg_historical_no2
        FROM `{project_id}.{dataset_name}.{table_name}`
        WHERE CO_GT_ > -200 AND NO2_GT_ > -200
    )

    SELECT
        l.timestamp,
        l.live_co_ug,
        l.live_no2_ug,
        h.avg_historical_co as baseline_co_2004,
        h.avg_historical_no2 as baseline_no2_2004
    FROM live_data l
    CROSS JOIN historical_baseline h
    ORDER BY l.timestamp ASC
"""

print(f"Creating View: {project_id}.{dataset_name}.pollution_trend_view...")
job = bq_client.query(time_series_query)
job.result()

print("Success! Time-Series View created.")


Creating View: mgmt467-fp-11.air_quality_dataset.pollution_trend_view...
Success! Time-Series View created.


In [23]:
# @title Step 4.4: Create Smoothed Pollution Trend View
time_series_query = f"""
    CREATE OR REPLACE VIEW `{project_id}.{dataset_name}.pollution_trend_view` AS

    WITH live_data_smoothed AS (
        SELECT
            -- Truncate timestamp to the nearest Minute to prevent "Too Many Rows" error
            TIMESTAMP_TRUNC(timestamp, MINUTE) as timestamp_minute,
            AVG(CASE WHEN parameter = 'co' THEN value END) as live_co_ug,
            AVG(CASE WHEN parameter = 'no2' THEN value END) as live_no2_ug
        FROM `{project_id}.{dataset_name}.{streaming_table_name}`
        -- Focus on last 24 hours to keep chart clean
        WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
        GROUP BY 1
    ),

    historical_baseline AS (
        SELECT
            AVG(CO_GT_) * 1000 as avg_historical_co,
            AVG(NO2_GT_) as avg_historical_no2
        FROM `{project_id}.{dataset_name}.{table_name}`
        WHERE CO_GT_ > -200 AND NO2_GT_ > -200
    )

    SELECT
        l.timestamp_minute as timestamp,
        l.live_co_ug,
        l.live_no2_ug,
        h.avg_historical_co as baseline_co_2004,
        h.avg_historical_no2 as baseline_no2_2004
    FROM live_data_smoothed l
    CROSS JOIN historical_baseline h
    ORDER BY l.timestamp_minute ASC
"""

print(f"Updating View: {project_id}.{dataset_name}.pollution_trend_view...")
job = bq_client.query(time_series_query)
job.result()

print("Success! View updated with 1-minute smoothing.")

Updating View: mgmt467-fp-11.air_quality_dataset.pollution_trend_view...
Success! View updated with 1-minute smoothing.


In [10]:
# @title Step 4.5: Create 'Forecast_Accuracy_View'
accuracy_view_query = f"""
    CREATE OR REPLACE VIEW `{project_id}.{dataset_name}.forecast_accuracy_view` AS

    WITH live_features AS (
        SELECT
            -- Truncate to minute to align with our smoothed data
            TIMESTAMP_TRUNC(timestamp, MINUTE) as timestamp,
            AVG(CASE WHEN parameter = 'no2' THEN value END) as no2,
            AVG(CASE WHEN parameter = 'co' THEN value END) as actual_co
        FROM `{project_id}.{dataset_name}.{streaming_table_name}`
        GROUP BY 1
        HAVING no2 IS NOT NULL AND actual_co IS NOT NULL
    ),

    predictions AS (
        SELECT
            timestamp,
            predicted_label as predicted_co,
            actual_co,
            no2 as input_no2
        FROM
            ML.PREDICT(MODEL `{project_id}.{dataset_name}.air_quality_model_v2`,
            (SELECT * FROM live_features))
    )

    SELECT
        timestamp,
        ROUND(predicted_co, 2) as predicted_co,
        ROUND(actual_co, 2) as actual_co,
        -- Calculate Error Percentage: ABS(Pred - Actual) / Actual
        ROUND(
            ABS(predicted_co - actual_co) / NULLIF(actual_co, 0) * 100
        , 2) as error_pct
    FROM predictions
    ORDER BY timestamp DESC
"""

print(f"Creating View: {project_id}.{dataset_name}.forecast_accuracy_view...")
job = bq_client.query(accuracy_view_query)
job.result()

print("Success! View created.")


Creating View: mgmt467-fp-11.air_quality_dataset.forecast_accuracy_view...
Success! View created.


In [26]:
# @title Step 5: Enable API & Create Scheduler
import time

# 1. Enable Cloud Scheduler API
print("Enabling Cloud Scheduler API...")
!gcloud services enable cloudscheduler.googleapis.com
print("API Enabled. Waiting 30 seconds for propagation...")
time.sleep(30) # Important: API enablement takes a moment to sync

# 2. Get Function URL
function_url_list = !gcloud functions describe openaq-ingest --gen2 --region=$region --format="value(url)"
function_url = function_url_list[0]
print(f"Target URL: {function_url}")

# 3. Create Scheduler Job
job_name = "openaq-ticker"
schedule = "*/15 * * * *"

print(f"Creating Scheduler Job '{job_name}'...")

# Try creating, if it exists, update it
!gcloud scheduler jobs create http $job_name \
    --schedule="$schedule" \
    --uri="$function_url" \
    --http-method=GET \
    --location=$region \
    --quiet \
    || gcloud scheduler jobs update http $job_name \
    --schedule="$schedule" \
    --uri="$function_url" \
    --http-method=GET \
    --location=$region \
    --quiet

print("\nSUCCESS: Scheduler created. Automation is live.")

Enabling Cloud Scheduler API...
Operation "operations/acf.p2-906780884112-fc4e671c-2fde-4528-8aed-f56b03211d52" finished successfully.
API Enabled. Waiting 30 seconds for propagation...
Target URL: https://us-central1-mgmt467-fp-11.cloudfunctions.net/openaq-ingest
Creating Scheduler Job 'openaq-ticker'...
attemptDeadline: 180s
httpTarget:
  headers:
    User-Agent: Google-Cloud-Scheduler
  httpMethod: GET
  uri: https://us-central1-mgmt467-fp-11.cloudfunctions.net/openaq-ingest
name: projects/mgmt467-fp-11/locations/us-central1/jobs/openaq-ticker
retryConfig:
  maxBackoffDuration: 3600s
  maxDoublings: 5
  maxRetryDuration: 0s
  minBackoffDuration: 5s
schedule: '*/15 * * * *'
scheduleTime: '2025-12-08T22:15:00Z'
state: ENABLED
status:
  code: -1
timeZone: Etc/UTC
userUpdateTime: '2025-12-08T22:02:24.435844Z'

SUCCESS: Scheduler created. Automation is live.
