<a href="https://colab.research.google.com/github/hiattn/CS193_Fall18_Lab1/blob/master/team/Final_Project/pipelines/MGMT467_FinalProject_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **1 - Batch Ingestion:**

In [10]:
# @title Step 1.1: Setup & Authentication
# Install required libraries for Google Cloud and Kaggle
!pip install -q kaggle google-cloud-bigquery google-cloud-storage pandas db-dtypes

# Authenticate User for Google Cloud access
# This will trigger a popup to allow access to your GCP resources
from google.colab import auth
auth.authenticate_user()

print("Libraries installed and Google Cloud successfully authenticated!")

Libraries installed and Google Cloud successfully authenticated!


In [11]:
# @title Step 1.2: Configuration & Kaggle JSON Upload
import os
import json
from google.colab import files
from google.cloud import bigquery
from google.cloud import storage

# --- Google Cloud Config ---
# User Input for Project ID
project_id = "mgmt-467-nh" # @param {type:"string"}
region = "us-central1" # @param {type:"string"}

# Define Resource Names
bucket_name = f"air_quality_raw_{project_id}" # Unique bucket name
dataset_name = "air_quality_dataset"
table_name = "sensor_data"

# Set the environment variable for the project
os.environ["GOOGLE_CLOUD_PROJECT"] = project_id

# Initialize Clients
bq_client = bigquery.Client(project=project_id)
storage_client = storage.Client(project=project_id)

print(f"Configuration set for Project: {project_id}")
print(f"Target Bucket: {bucket_name}")
print("-------------------------------------------------")

# --- Kaggle Authentication ---
print("Please upload your kaggle.json file now...")
uploaded = files.upload()

# Check if kaggle.json was uploaded
if 'kaggle.json' in uploaded:
    # Create the .kaggle directory if it doesn't exist
    !mkdir -p ~/.kaggle

    # Move the uploaded file to the .kaggle directory
    !mv kaggle.json ~/.kaggle/

    # Change permissions to ensure the file is secure (required by Kaggle API)
    !chmod 600 ~/.kaggle/kaggle.json

    print("\nSUCCESS: kaggle.json uploaded and permissions set.")
else:
    print("\nERROR: kaggle.json not found. Please run the cell again and upload the correct file.")

Configuration set for Project: mgmt-467-nh
Target Bucket: air_quality_raw_mgmt-467-nh
-------------------------------------------------
Please upload your kaggle.json file now...


Saving kaggle.json to kaggle.json

SUCCESS: kaggle.json uploaded and permissions set.


In [None]:
# @title Step 1.3: Ingest from Kaggle to GCS (Raw Data Lake)
from google.cloud import storage
import os

# 1. Download Dataset from Kaggle
print("Downloading data from Kaggle...")
!kaggle datasets download -d fedesoriano/air-quality-data-set

# 2. Unzip the file
print("Unzipping data...")
!unzip -o air-quality-data-set.zip

# Find the .csv file name (it varies sometimes)
files = [f for f in os.listdir('.') if f.endswith('.csv')]
if not files:
    raise ValueError("No CSV file found in the downloaded dataset!")
source_file_name = files[0]
print(f"Found raw file: {source_file_name}")

# 3. Create GCS Bucket (if it doesn't exist)
bucket = storage_client.bucket(bucket_name)
if not bucket.exists():
    print(f"Creating bucket {bucket_name}...")
    bucket = storage_client.create_bucket(bucket_name, location=region)
else:
    print(f"Bucket {bucket_name} already exists.")

# 4. Upload File to GCS
blob_name = source_file_name # Keep the same name in GCS
blob = bucket.blob(blob_name)

print(f"Uploading {source_file_name} to gs://{bucket_name}/{blob_name}...")
blob.upload_from_filename(source_file_name)

print("Success! Raw data is now stored in Google Cloud Storage.")


Downloading data from Kaggle...
Dataset URL: https://www.kaggle.com/datasets/fedesoriano/air-quality-data-set
License(s): copyright-authors
Downloading air-quality-data-set.zip to /content
  0% 0.00/248k [00:00<?, ?B/s]
100% 248k/248k [00:00<00:00, 518MB/s]
Unzipping data...
Archive:  air-quality-data-set.zip
  inflating: AirQuality.csv          
Found raw file: AirQuality.csv
Creating bucket air_quality_raw_mgmt-467-nh...
Uploading AirQuality.csv to gs://air_quality_raw_mgmt-467-nh/AirQuality.csv...
Success! Raw data is now stored in Google Cloud Storage.


In [None]:
# @title Step 1.4 (Retry): Curated Load to BigQuery (Schema & Partitioning)
import pandas as pd
import re
from google.cloud import bigquery

# 1. Read Raw Data from GCS
source_uri = f"gs://{bucket_name}/{source_file_name}"
print(f"Reading raw data from {source_uri}...")
df = pd.read_csv(source_uri, sep=';', decimal=',')

# 2. Data Curation
# Drop empty columns (artifacts)
df = df.dropna(axis=1, how='all')

# Fix Date Format for Partitioning (DD/MM/YYYY -> YYYY-MM-DD)
df['Date'] = pd.to_datetime(df['Date'], format='%d/%m/%Y').dt.date

# --- FIX: Robust Column Renaming ---
# Function to clean column names for BigQuery (only Alphanumeric and _)
def clean_col_name(name):
    # Replace non-alphanumeric characters (like . or () ) with _
    clean = re.sub(r'[^a-zA-Z0-9]', '_', name)
    return clean

# Apply cleaning
df.columns = [clean_col_name(c) for c in df.columns]

print("Cleaned Column Names:", df.columns.tolist())

# 3. Define BigQuery Schema & Partitioning
# We map the specific clean names to types
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("Date", "DATE"),
        bigquery.SchemaField("Time", "STRING"),
        # We explicitly map the cleaned names:
        bigquery.SchemaField("CO_GT_", "FLOAT"),
        bigquery.SchemaField("PT08_S1_CO_", "FLOAT"),
        bigquery.SchemaField("NMHC_GT_", "FLOAT"),
        bigquery.SchemaField("C6H6_GT_", "FLOAT"),
        bigquery.SchemaField("PT08_S2_NMHC_", "FLOAT"),
        bigquery.SchemaField("NOx_GT_", "FLOAT"),
        bigquery.SchemaField("PT08_S3_NOx_", "FLOAT"),
        bigquery.SchemaField("NO2_GT_", "FLOAT"),
        bigquery.SchemaField("PT08_S4_NO2_", "FLOAT"),
        bigquery.SchemaField("PT08_S5_O3_", "FLOAT"),
        bigquery.SchemaField("T", "FLOAT"),
        bigquery.SchemaField("RH", "FLOAT"),
        bigquery.SchemaField("AH", "FLOAT"),
    ],
    # --- PARTITIONING ---
    time_partitioning=bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="Date"
    ),
    write_disposition="WRITE_TRUNCATE",
)

# 4. Load to BigQuery
dataset_ref = bq_client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)

# Ensure dataset exists
try:
    bq_client.get_dataset(dataset_ref)
except:
    bq_client.create_dataset(dataset_ref)

print(f"Loading data into {project_id}.{dataset_name}.{table_name}...")
job = bq_client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()

print("Success! Data loaded with Cleaned Schema and Partitioning.")


Reading raw data from gs://air_quality_raw_mgmt-467-nh/AirQuality.csv...
Cleaned Column Names: ['Date', 'Time', 'CO_GT_', 'PT08_S1_CO_', 'NMHC_GT_', 'C6H6_GT_', 'PT08_S2_NMHC_', 'NOx_GT_', 'PT08_S3_NOx_', 'NO2_GT_', 'PT08_S4_NO2_', 'PT08_S5_O3_', 'T', 'RH', 'AH']
Loading data into mgmt-467-nh.air_quality_dataset.sensor_data...
Success! Data loaded with Cleaned Schema and Partitioning.


In [None]:
# @title Step 1.5: Data Quality Check
from google.cloud import bigquery

# --- 1. Data Quality Check (SQL) ---
# We check for the specific error code '-200' which indicates sensor failure/missing data
dq_query = f"""
    SELECT
        COUNT(*) as total_rows,
        COUNTIF(CO_GT_ = -200) as missing_co_readings,
        ROUND((COUNTIF(CO_GT_ = -200) / COUNT(*)) * 100, 2) as percent_missing
    FROM `{project_id}.{dataset_name}.{table_name}`
"""

print("Running Data Quality Check on BigQuery...")
query_job = bq_client.query(dq_query)
results = query_job.result()

print("\n--- Data Quality Results ---")
for row in results:
    print(f"Total Rows Loaded: {row.total_rows}")
    print(f"Rows with Missing CO Data (-200): {row.missing_co_readings}")
    print(f"Data Quality Impact: {row.percent_missing}% of Carbon Monoxide readings are missing.")


Running Data Quality Check on BigQuery...

--- Data Quality Results ---
Total Rows Loaded: 9471
Rows with Missing CO Data (-200): 1683
Data Quality Impact: 17.77% of Carbon Monoxide readings are missing.


# Transformation Logic Explanation

Transformation: Date Format Conversion (DD/MM/YYYY -> YYYY-MM-DD)

Logic:
The raw CSV provided dates as strings in European format (e.g., '10/03/2004').
However, BigQuery 'Time Partitioning' strictly requires a DATE or TIMESTAMP data type.
We parsed the string into a Python Date object during the Pandas load step.

Benefit:
By transforming this column into a DATE type, we enabled BigQuery to physically
partition the storage by Day. This reduces query costs and improves performance
when filtering by specific dates, as the engine only scans relevant partitions
rather than the entire table.

# **2 - Streaming Ingestion:**

In [14]:
# @title Step 2.1: Configure gcloud, Enable APIs & Create Pub/Sub
# 1. Set the project for gcloud commands
!gcloud config set project $project_id

# 2. Enable APIs
print("Enabling necessary APIs (this may take a minute)...")
!gcloud services enable \
    cloudfunctions.googleapis.com \
    run.googleapis.com \
    pubsub.googleapis.com \
    cloudbuild.googleapis.com \
    artifactregistry.googleapis.com --project=$project_id

print("APIs enabled.")

# 3. Create Pub/Sub Topic
topic_id = "openaq-topic"

# Create the topic (using quiet flag to suppress 'already exists' error cleanly)
!gcloud pubsub topics create $topic_id --project=$project_id || echo "Topic likely already exists"

print(f"\nTarget Topic: projects/{project_id}/topics/{topic_id}")


Updated property [core/project].


To take a quick anonymous survey, run:
  $ gcloud survey

Enabling necessary APIs (this may take a minute)...
Operation "operations/acat.p2-16331834856-0abb5cd4-0622-4f59-982f-13d842507270" finished successfully.
APIs enabled.
Created topic [projects/mgmt-467-nh/topics/openaq-topic].

Target Topic: projects/mgmt-467-nh/topics/openaq-topic


In [17]:
# @title Step 2.2: Create Streaming Table & BigQuery Subscription
from google.cloud import bigquery

# --- 1. Create BigQuery Streaming Table ---
streaming_table_name = "streaming_air_quality"
table_ref = bq_client.dataset(dataset_name).table(streaming_table_name)

# Define Schema (Data fields only)
schema = [
    bigquery.SchemaField("timestamp", "TIMESTAMP"),
    bigquery.SchemaField("city", "STRING"),
    bigquery.SchemaField("parameter", "STRING"),
    bigquery.SchemaField("value", "FLOAT"),
    bigquery.SchemaField("unit", "STRING"),
]

table = bigquery.Table(table_ref, schema=schema)

# Create table if it doesn't exist
try:
    bq_client.get_table(table_ref)
    print(f"Table {streaming_table_name} already exists.")
except:
    print(f"Creating table {streaming_table_name}...")
    bq_client.create_table(table)

# --- 2. Grant Permissions ---
print("Retrieving Project Number...")
project_number_list = !gcloud projects list --filter="project_id:$project_id" --format="value(projectNumber)"
project_number = project_number_list[0]
service_account = f"service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com"

# Grant permission
!gcloud projects add-iam-policy-binding $project_id \
    --member="serviceAccount:$service_account" \
    --role="roles/bigquery.dataEditor" > /dev/null

# --- 3. Create Subscription (Data Only) ---
subscription_id = "openaq-to-bq"
full_table_id = f"{project_id}:{dataset_name}.{streaming_table_name}"

print(f"Creating Subscription {subscription_id}...")

# We removed '--write-metadata' so it matches our table schema perfectly
!gcloud pubsub subscriptions create $subscription_id \
    --topic=$topic_id \
    --bigquery-table=$full_table_id \
    --use-table-schema \
    --project=$project_id || echo "Subscription likely already exists."

print(f"\nPipeline Ready: Topic -> Subscription -> Table ({streaming_table_name})")

Table streaming_air_quality already exists.
Retrieving Project Number...
Updated IAM policy for project [mgmt-467-nh].
Creating Subscription openaq-to-bq...
Created subscription [projects/mgmt-467-nh/subscriptions/openaq-to-bq].

Pipeline Ready: Topic -> Subscription -> Table (streaming_air_quality)


In [80]:
# @title Step 2.3: Deploy Open-Meteo Cloud Function (Rome, Italy)
import os

# 1. Create Directory
os.makedirs("openaq_function", exist_ok=True)

# 2. Write Function Code (Open-Meteo)
main_py_content = """import functions_framework
import json
import requests
import os
import datetime
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
PROJECT_ID = os.environ.get("GCP_PROJECT")
TOPIC_ID = "openaq-topic"
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)

@functions_framework.http
def fetch_openaq(request):
    \"\"\"
    Fetches live Air Quality data for Rome, Italy from Open-Meteo.
    \"\"\"
    # Open-Meteo Air Quality API
    url = "https://air-quality-api.open-meteo.com/v1/air-quality"

    # Coordinates for Rome, Italy
    params = {
        "latitude": 41.9028,
        "longitude": 12.4964,
        "current": "carbon_monoxide,nitrogen_dioxide,ozone",
        "timezone": "Europe/Rome"
    }

    try:
        response = requests.get(url, params=params)
        if response.status_code != 200:
            return f"API Error: {response.status_code}", 500

        data = response.json()
        current = data.get("current", {})
        current_units = data.get("current_units", {})

        # Map Open-Meteo names to our standard names
        # carbon_monoxide -> co
        # nitrogen_dioxide -> no2
        # ozone -> o3
        mapping = {
            "carbon_monoxide": "co",
            "nitrogen_dioxide": "no2",
            "ozone": "o3"
        }

        count = 0

        # Loop through the 3 gases we requested
        for api_name, standard_name in mapping.items():
            val = current.get(api_name)
            unit = current_units.get(api_name, "unknown")

            if val is not None:
                payload = {
                    # Use current UTC time or the time provided by API
                    "timestamp": current.get("time") + ":00", # Add seconds for BQ Timestamp format
                    "city": "Rome",
                    "parameter": standard_name,
                    "value": float(val),
                    "unit": unit
                }

                # Publish
                data_str = json.dumps(payload)
                publisher.publish(topic_path, data_str.encode("utf-8"))
                count += 1

        return f"Success: Published {count} live readings from Rome.", 200

    except Exception as e:
        return f"Error: {str(e)}", 500
"""

with open("openaq_function/main.py", "w") as f:
    f.write(main_py_content)

# 3. Write Requirements
with open("openaq_function/requirements.txt", "w") as f:
    f.write("functions-framework==3.*\ngoogle-cloud-pubsub\nrequests\n")

# 4. Deploy
print("Deploying Open-Meteo Function (Rome)...")
!gcloud functions deploy openaq-ingest \
    --gen2 \
    --runtime=python310 \
    --region=$region \
    --source=./openaq_function \
    --entry-point=fetch_openaq \
    --trigger-http \
    --allow-unauthenticated \
    --set-env-vars=GCP_PROJECT=$project_id

Deploying Open-Meteo Function (Rome)...
  [INFO] A new revision will be deployed serving with 100% traffic.
You can view your function in the Cloud Console here: https://console.cloud.google.com/functions/details/us-central1/openaq-ingest?project=mgmt-467-nh

buildConfig:
  automaticUpdatePolicy: {}
  build: projects/16331834856/locations/us-central1/builds/4a06a525-82db-4401-94b8-df1d133dc5e8
  dockerRegistry: ARTIFACT_REGISTRY
  dockerRepository: projects/mgmt-467-nh/locations/us-central1/repositories/gcf-artifacts
  entryPoint: fetch_openaq
  runtime: python310
  serviceAccount: projects/mgmt-467-nh/serviceAccounts/16331834856-compute@developer.gserviceaccount.com
  source:
    storageSource:
      bucket: gcf-v2-sources-16331834856-us-central1
      generation: '1764904263376193'
      object: openaq-ingest/function-source.zip
  sourceProvenance:
    resolvedStorageSource:
      bucket: gcf-v2-sources-16331834856-us-central1
      generation: '1764904263376193'
      object: openaq

In [81]:
# @title Step 2.4: Trigger Ingest & Validate Streaming Data
import requests
import time
from google.cloud import bigquery

# 1. Trigger the Cloud Function
# We retrieve the URL dynamically from gcloud to make it robust
function_url = !gcloud functions describe openaq-ingest --gen2 --region=$region --format="value(url)"
function_url = function_url[0]

print(f"Triggering Cloud Function at: {function_url}")
response = requests.get(function_url)

if response.status_code == 200:
    print(f"Function Response: {response.text}")
else:
    print(f"Error triggering function: {response.text}")

# 2. Wait for Ingestion
print("Waiting 15 seconds for data to flow from Pub/Sub to BigQuery...")
time.sleep(15)

# 3. Validate Data in BigQuery
print("Querying BigQuery for latest streaming records...")

query = f"""
    SELECT timestamp, city, parameter, value, unit
    FROM `{project_id}.{dataset_name}.{streaming_table_name}`
    ORDER BY timestamp DESC
    LIMIT 10
"""

query_job = bq_client.query(query)
results = list(query_job.result())

if results:
    print(f"\nSUCCESS: Found {len(results)} records.")
    print("-" * 60)
    print(f"{'TIMESTAMP':<25} | {'CITY':<20} | {'PARAM':<10} | {'VALUE'}")
    print("-" * 60)
    for row in results:
        print(f"{str(row.timestamp):<25} | {row.city[:20]:<20} | {row.parameter:<10} | {row.value}")
else:
    print("\nWARNING: No rows found yet. The pipeline might need a few more seconds.")
    print("Try running this cell again in 1 minute.")


Triggering Cloud Function at: https://us-central1-mgmt-467-nh.cloudfunctions.net/openaq-ingest
Function Response: Success: Published 3 live readings from Rome.
Waiting 15 seconds for data to flow from Pub/Sub to BigQuery...
Querying BigQuery for latest streaming records...

SUCCESS: Found 3 records.
------------------------------------------------------------
TIMESTAMP                 | CITY                 | PARAM      | VALUE
------------------------------------------------------------
2025-12-05 04:00:00+00:00 | Rome                 | o3         | 4.0
2025-12-05 04:00:00+00:00 | Rome                 | co         | 422.0
2025-12-05 04:00:00+00:00 | Rome                 | no2        | 28.5


# **3 - Analytics and Modeling:**

In [86]:
# @title Step 3.1 (Revised): Retrain BQML Model (Unit-Corrected)
from google.cloud import bigquery

# Feature Engineering in SQL:
# 1. CO_GT_ * 1000 -> Converts mg/m^3 to µg/m^3 (Matching Open-Meteo)
# 2. NO2_GT_       -> Already µg/m^3 (Matching Open-Meteo)
# 3. Removing O3   -> Removing the incompatible feature

train_query = f"""
    CREATE OR REPLACE MODEL `{project_id}.{dataset_name}.air_quality_model_v2`
    OPTIONS(model_type='LINEAR_REG') AS
    SELECT
        (CO_GT_ * 1000) as label,   -- TARGET: Converted to Micrograms
        NO2_GT_ as no2              -- FEATURE: Nitrogen Dioxide (True Concentration)
    FROM
        `{project_id}.{dataset_name}.{table_name}`
    WHERE
        CO_GT_ > -200
        AND NO2_GT_ > -200
"""

print(f"Training Engineered Model (v2) in {project_id}.{dataset_name}...")
job = bq_client.query(train_query)
job.result()

print("SUCCESS: Unit-Corrected Model trained.")

Training Engineered Model (v2) in mgmt-467-nh.air_quality_dataset...
SUCCESS: Unit-Corrected Model trained.


In [87]:
# @title Step 3.2: Evaluate Model Performance
eval_query = f"""
    SELECT *
    FROM ML.EVALUATE(MODEL `{project_id}.{dataset_name}.air_quality_model`, (
        SELECT
            CO_GT_ as label,
            NO2_GT_ as no2,
            PT08_S5_O3_ as o3
        FROM
            `{project_id}.{dataset_name}.{table_name}`
        WHERE
            CO_GT_ > -200 AND NO2_GT_ > -200 AND PT08_S5_O3_ > -200
    ))
"""

print("Running Model Evaluation...")
job = bq_client.query(eval_query)
results = list(job.result())

print("\n--- Model Metrics ---")
for row in results:
    print(f"R2 Score: {row.r2_score:.4f}")
    print(f"Mean Absolute Error: {row.mean_absolute_error:.4f} mg/m^3")
    print(f"Mean Squared Error: {row.mean_squared_error:.4f}")

print("\nInterpretation: An R2 close to 1.0 is perfect. An R2 of 0 means the model is useless.")

Running Model Evaluation...

--- Model Metrics ---
R2 Score: 0.7393
Mean Absolute Error: 0.5350 mg/m^3
Mean Squared Error: 0.5414

Interpretation: An R2 close to 1.0 is perfect. An R2 of 0 means the model is useless.


In [88]:
# @title Step 3.3 (Revised): Predict with Corrected Model
predict_query = f"""
    WITH live_features AS (
        SELECT
            timestamp,
            -- We only need NO2 now
            MAX(CASE WHEN parameter = 'no2' THEN value END) as no2,
            MAX(CASE WHEN parameter = 'co' THEN value END) as actual_co_live
        FROM `{project_id}.{dataset_name}.{streaming_table_name}`
        GROUP BY timestamp
        HAVING no2 IS NOT NULL
    )

    SELECT
        timestamp,
        ROUND(predicted_label, 2) as predicted_co_ug, -- Prediction in Micrograms
        actual_co_live,
        no2 as input_no2
    FROM
        ML.PREDICT(MODEL `{project_id}.{dataset_name}.air_quality_model_v2`,
        (SELECT * FROM live_features))
    ORDER BY timestamp DESC
    LIMIT 10
"""

print("Running Predictions using Engineered Model...")
job = bq_client.query(predict_query)
results = list(job.result())

print("\n--- Live Predictions (Corrected Scale) ---")
print(f"{'TIMESTAMP':<25} | {'PREDICTED CO (µg)':<18} | {'ACTUAL CO':<10} | {'INPUT NO2'}")
print("-" * 75)
for row in results:
    print(f"{str(row.timestamp):<25} | {row.predicted_co_ug:<18} | {row.actual_co_live:<10} | {row.input_no2}")

Running Predictions using Engineered Model...

--- Live Predictions (Corrected Scale) ---
TIMESTAMP                 | PREDICTED CO (µg)  | ACTUAL CO  | INPUT NO2
---------------------------------------------------------------------------
2025-12-05 04:00:00+00:00 | 430.88             | 422.0      | 28.5
