# **Assignment 2 — Automated Data Pipeline & ML Prediction**
MGMT 467 · Fall 2025  

**Team Name:** ☐  
**Members (GitHub handles):** ☐ ☐ ☐ ☐  
**GitHub Repo URL:** ☐

> **Scenario:** Extend your analysis by enriching data with **hourly weather** and training a **BQML model**. Build a **serverless batch pipeline** (Cloud Function + Scheduler) to keep weather updated.

## ✅ Submission Checklist (Team → Brightspace)
- [ ] GitHub repo link (source of record)  
- [ ] Screenshot: Cloud Function successful runs (Logs Explorer)  
- [ ] Screenshot: BigQuery weather table with new rows  
- [ ] This notebook with prompts and results committed to GitHub

### ✅ Submission Checklist (Individual → Brightspace)
- [ ] `Contribution_Reflection.pdf` (commit/PR evidence + peer eval)

## 🎯 Learning Objectives
- Call an external API and stage data into **BigQuery**
- Train, evaluate, and use a **BQML** model
- Deploy a **Cloud Function** and schedule it with **Cloud Scheduler**
- Document architecture, logs, and verification steps

## 🧰 Setup
> Run once per runtime session.

In [None]:
# !pip install --quiet google-cloud-bigquery google-cloud-storage google-cloud-pubsub

import pandas as pd
import json
from google.colab import auth  # type: ignore
auth.authenticate_user()

PROJECT_ID = "your-gcp-project-id"  # <-- edit
DATASET_ID = "your_dataset"         # <-- create in BigQuery (e.g., mgmt467)
REGION = "US"                       # <-- BigQuery region
print("Using project:", PROJECT_ID)

In [None]:
from google.cloud import bigquery
bq = bigquery.Client(project=PROJECT_ID)

# Create dataset if not exists
ds_ref = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
ds_ref.location = REGION
try:
    bq.create_dataset(ds_ref, exists_ok=True)
    print("Dataset ready:", f"{PROJECT_ID}.{DATASET_ID}")
except Exception as e:
    print(e)

## 1) Weather API (Prototype in Colab)
Use a free API (e.g., OpenWeatherMap). **Do not commit secrets.**  
Store your API key in a Colab secret during testing.

In [None]:
# Example prototype (adapt to your chosen API)
# NEVER commit your real API key to GitHub.
import requests, datetime as dt

CITY = "New York"
API_KEY = "YOUR_API_KEY"  # <-- Set manually in Colab only (DO NOT COMMIT)
url = f"https://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"

resp = requests.get(url)
resp.raise_for_status()
weather = resp.json()
weather_sample = {
    "city": CITY,
    "ts_utc": pd.Timestamp.utcnow().isoformat(),
    "temp_c": weather.get("main", {}).get("temp"),
    "humidity": weather.get("main", {}).get("humidity"),
    "wind_mps": weather.get("wind", {}).get("speed"),
    "raw": json.dumps(weather)[:500]  # trimmed raw blob for debugging
}
weather_sample

### Load Prototype Row into BigQuery
Create a table if it doesn't exist and append the prototype record.

In [None]:
schema = [
    bigquery.SchemaField("city", "STRING"),
    bigquery.SchemaField("ts_utc", "TIMESTAMP"),
    bigquery.SchemaField("temp_c", "FLOAT"),
    bigquery.SchemaField("humidity", "FLOAT"),
    bigquery.SchemaField("wind_mps", "FLOAT"),
    bigquery.SchemaField("raw", "STRING"),
]

table_id = f"{PROJECT_ID}.{DATASET_ID}.weather_hourly"
table = bigquery.Table(table_id, schema=schema)
table = bq.create_table(table, exists_ok=True)

errors = bq.insert_rows_json(table, [weather_sample])
print("Insert errors:", errors)

## 2) BQML Model (Regression or Classification)
Train a model using BQML. You may use Citi Bike (`bigquery-public-data.new_york_citibike.citibike_trips`) or Chicago Taxi, and incorporate engineered features.

> Example objective (regression): Predict `tripduration`.

In [None]:
# CREATE MODEL (template) — edit features/target as needed
query_create_model = r"""
CREATE OR REPLACE MODEL `{PROJECT_ID}.{DATASET_ID}.trip_duration_model`
OPTIONS(
  model_type = 'linear_reg',
  input_label_cols = ['tripduration']
) AS

SELECT
  tripduration,
  EXTRACT(HOUR FROM starttime) AS start_hour,
  EXTRACT(DAYOFWEEK FROM starttime) AS dow,
  start_station_id,
  end_station_id,
  usertype
FROM `bigquery-public-data.new_york_citibike.citibike_trips`
WHERE starttime BETWEEN '2019-07-01' AND '2019-07-31';
"""
print(query_create_model)

In [None]:
# Run the CREATE MODEL
bq.query(query_create_model.format(PROJECT_ID=PROJECT_ID, DATASET_ID=DATASET_ID)).result()
print("Model created.")

In [None]:
# Evaluate the model
query_eval = f"""
SELECT * FROM ML.EVALUATE(MODEL `{PROJECT_ID}.{DATASET_ID}.trip_duration_model`)
"""
df_eval = bq.query(query_eval).to_dataframe()
df_eval

In [None]:
# Predict (example with sampled rows)
query_pred = f"""
SELECT *
FROM ML.PREDICT(MODEL `{PROJECT_ID}.{DATASET_ID}.trip_duration_model`,
  (
    SELECT
      EXTRACT(HOUR FROM starttime) AS start_hour,
      EXTRACT(DAYOFWEEK FROM starttime) AS dow,
      start_station_id,
      end_station_id,
      usertype
    FROM `bigquery-public-data.new_york_citibike.citibike_trips`
    WHERE starttime BETWEEN '2019-08-01' AND '2019-08-02'
    LIMIT 100
  )
)
"""
df_pred = bq.query(query_pred).to_dataframe()
df_pred.head()

## 3) Cloud Function (Deployment Package)
Create a **Python Cloud Function** that fetches weather and writes to BigQuery.

> Place these two files in your repo under `/cloud_function/`.

**`main.py` (template)**
```python
import os, json, datetime as dt
from google.cloud import bigquery
import functions_framework
import requests

PROJECT_ID = os.environ.get("GCP_PROJECT")
DATASET_ID = os.environ.get("DATASET_ID", "your_dataset")
TABLE_ID = f"{PROJECT_ID}.{DATASET_ID}.weather_hourly"
CITY = os.environ.get("CITY", "New York")
API_KEY = os.environ.get("OWM_API_KEY")

@functions_framework.http
def ingest_weather(request):
    if not API_KEY:
        return ("Missing OWM_API_KEY", 500)

    url = f"https://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
    resp = requests.get(url, timeout=20)
    resp.raise_for_status()
    w = resp.json()

    row = [{
        "city": CITY,
        "ts_utc": dt.datetime.utcnow().isoformat(),
        "temp_c": w.get("main", {}).get("temp"),
        "humidity": w.get("main", {}).get("humidity"),
        "wind_mps": w.get("wind", {}).get("speed"),
        "raw": json.dumps(w)[:500]
    }]

    bq = bigquery.Client(project=PROJECT_ID)
    errors = bq.insert_rows_json(TABLE_ID, row)
    if errors:
        return (str(errors), 500)
    return ("OK", 200)
```

**`requirements.txt`**
```
google-cloud-bigquery==3.*
functions-framework==3.*
requests==2.*
```

### Deploy (Console or gcloud)
- **Runtime:** Python 3.11  
- **Trigger:** HTTP  
- **Region:** us-central1 (or your choice)  
- **Env Vars:** `DATASET_ID=your_dataset`, `CITY=New York`, `OWM_API_KEY=***`

**gcloud (template):**
```bash
gcloud functions deploy ingest_weather   --gen2 --region=us-central1 --runtime=python311 --source=.   --entry-point=ingest_weather --trigger-http --allow-unauthenticated   --set-env-vars=DATASET_ID=your_dataset,CITY="New York",OWM_API_KEY=YOUR_KEY
```

## 4) Cloud Scheduler (Hourly Trigger)
Create a Scheduler job to hit the Cloud Function every hour.

**gcloud (template):**
```bash
gcloud scheduler jobs create http weather-hourly   --location=us-central1   --schedule="0 * * * *"   --uri="https://<function-url>"   --http-method=GET
```

## 5) Verification & Documentation
- **Logs Explorer:** Screenshot of two successful runs  
- **BigQuery:** Screenshot showing new rows in `weather_hourly`  
- **README.md:** Architecture diagram + setup steps + known issues

## 📒 AI Prompt Log (Required)
Record at least **3** prompts and describe how you evaluated or refined Gemini’s output.

| # | Prompt (summary) | Where used | What changed after refinement? |
|---|------------------|------------|--------------------------------|
| 1 | ☐ | API / BQML / Deploy | ☐ |
| 2 | ☐ | API / BQML / Deploy | ☐ |
| 3 | ☐ | API / BQML / Deploy | ☐ |

## 📦 Appendix — Reproducibility
- Project/region: ☐  
- Dataset/table names: ☐  
- Model name: ☐  
- Cost/quotas encountered: ☐