#### Create vertex ai pipeline for scheduled report run

* set up pipeline
* schedule to run hourly

In [None]:
!pip install kfp google-cloud-aiplatform

In [7]:
# pipeline.py
import os
from kfp import dsl
from kfp.dsl import component
from google.cloud import aiplatform

# -------- Component 1: NWS -> BQ load --------
@component(
    base_image="python:3.11",
    packages_to_install=[
        "google-cloud-bigquery",
        "pandas",
        "pyarrow",
        "db-dtypes",
        "requests",
    ],
)
def fetch_forecasts_to_bq(
    project_id: str,
    dataset: str,
    airports_table: str,
    alert_table: str,
    sleep_s: float = 0.5,
):
    import time
    import requests
    import pandas as pd
    from datetime import datetime, timezone
    from google.cloud import bigquery

    bq_client = bigquery.Client(project=project_id)

    def get_nws_forecast(lat, lon):
        headers = {"User-Agent": "AeroAlertsPipeline/1.0", "Accept": "application/geo+json"}
        try:
            r = requests.get(f"https://api.weather.gov/points/{lat},{lon}", headers=headers, timeout=15)
            r.raise_for_status()
            forecast_url = r.json()["properties"]["forecast"]

            r2 = requests.get(forecast_url, headers=headers, timeout=15)
            r2.raise_for_status()
            periods = r2.json()["properties"]["periods"][:4]
            return "\n".join([f"{p['name']}: {p['detailedForecast']}" for p in periods])
        except Exception as e:
            return f"Forecast unavailable: {str(e)}"

    query = f"""
    SELECT
      id, ident, name, municipality, iso_region, iso_country,
      iata_code, icao_code, latitude_deg, longitude_deg
    FROM `{airports_table}`
    WHERE latitude_deg IS NOT NULL AND longitude_deg IS NOT NULL
    """
    airports = bq_client.query(query).to_dataframe()

    rows = []
    now_ts = datetime.now(timezone.utc).isoformat()

    for _, row in airports.iterrows():
        lat = float(row["latitude_deg"])
        lon = float(row["longitude_deg"])
        forecast = get_nws_forecast(lat, lon)

        rows.append({
            "id": int(row["id"]),
            "ident": row["ident"],
            "name": row["name"],
            "municipality": row["municipality"],
            "iso_region": row["iso_region"],
            "iso_country": row["iso_country"],
            "iata_code": row["iata_code"],
            "icao_code": row["icao_code"],
            "latitude_deg": lat,
            "longitude_deg": lon,
            "forecast_text": forecast,
            "forecast_retrieved_at": now_ts,
            "alert_text": None,
            "alert_level": None,
            "alert_summary": None,
            "alert_generated_at": None,
        })
        time.sleep(sleep_s)

    df = pd.DataFrame(rows)

    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
    )
    load_job = bq_client.load_table_from_dataframe(df, alert_table, job_config=job_config)
    load_job.result()

    print(f"Loaded {len(df)} rows into {alert_table}")


# -------- Component 2: BQ ML.GENERATE_TEXT -> rewrite table --------
@component(
    base_image="python:3.11",
    packages_to_install=["google-cloud-bigquery"],
)
def generate_alerts_in_bq(
    project_id: str,
    alert_table: str,
    model_name: str,
    temperature: float = 0.2,
    max_output_tokens: int = 2000,
):
    from google.cloud import bigquery

    bq_client = bigquery.Client(project=project_id)

    sql = f"""
    CREATE OR REPLACE TABLE `{alert_table}` AS
    WITH gen AS (
      SELECT
        x.*,
        JSON_VALUE(x.ml_generate_text_result, '$.candidates[0].content.parts[0].text') AS _model_text
      FROM ML.GENERATE_TEXT(
        MODEL `{model_name}`,
        (
          SELECT
            CONCAT(
              'You are an aviation operations assistant. ',
              'Output STRICT JSON only. Do NOT wrap in ``` fences. ',
              'Keys: alert_level (GREEN/YELLOW/RED), alert_summary (2-4 sentences, actionable), key_hazards (array). ',
              'Airport name: ', COALESCE(name, ''), '. ',
              'Forecast: ', COALESCE(forecast_text, '')
            ) AS prompt,
            t.*
          FROM `{alert_table}` t
          WHERE forecast_text IS NOT NULL
        ),
        STRUCT(CAST({temperature} AS FLOAT64) AS temperature,
               CAST({max_output_tokens} AS INT64) AS max_output_tokens)
      ) AS x
    ),
    final AS (
      SELECT
        * EXCEPT(
          prompt, ml_generate_text_result, ml_generate_text_status,
          alert_text, alert_level, alert_summary, alert_generated_at
        ),
        TRIM(
          REGEXP_REPLACE(
            REGEXP_REPLACE(_model_text, r'(?is)```json', ''),
            r'(?is)```', ''
          )
        ) AS _cleaned_json
      FROM gen
    )
    SELECT
      * EXCEPT(_cleaned_json),
      _cleaned_json AS alert_text,
      JSON_VALUE(_cleaned_json, '$.alert_level') AS alert_level,
      JSON_VALUE(_cleaned_json, '$.alert_summary') AS alert_summary,
      CURRENT_TIMESTAMP() AS alert_generated_at
    FROM final
    """
    bq_client.query(sql).result()
    print("Rewritten table:", alert_table)


# -------- Pipeline definition --------
@dsl.pipeline(name="aero-alerts-pipeline")
def aero_alerts_pipeline(
    project_id: str,
    region: str = "us-central1",
    dataset: str = "challenge5",
):
    airports_table = f"{project_id}.{dataset}.airports"
    alert_table = f"{project_id}.{dataset}.airport_alerts"
    model_name = f"{project_id}.{dataset}.gemini_model"

    step_a = fetch_forecasts_to_bq(
        project_id=project_id,
        dataset=dataset,
        airports_table=airports_table,
        alert_table=alert_table,
    )

    generate_alerts_in_bq(
        project_id=project_id,
        alert_table=alert_table,
        model_name=model_name,
    ).after(step_a)


In [8]:
# Job test to run once
if __name__ == "__main__":
    from kfp import compiler

    PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
    REGION = os.environ.get("VERTEX_REGION", "us-central1")

    pipeline_json = "aero_alerts_pipeline.json"
    compiler.Compiler().compile(
        pipeline_func=aero_alerts_pipeline,
        package_path=pipeline_json,
    )

    aiplatform.init(project=PROJECT_ID, location=REGION)

    job = aiplatform.PipelineJob(
        display_name="aero-alerts-run",
        template_path=pipeline_json,
        parameter_values={"project_id": PROJECT_ID, "region": REGION, "dataset": "challenge5"},
        enable_caching=False,  # turn on later if you want caching
    )
    job.run()

In [9]:
import google.auth
from google.cloud import aiplatform
from kfp import compiler  # kfp>=2

# Get creds + project
creds, PROJECT_ID = google.auth.default()
REGION = "us-central1"

# Compile
compiler.Compiler().compile(
    pipeline_func=aero_alerts_pipeline,
    package_path="pipeline.json",
)

# Init Vertex
aiplatform.init(project=PROJECT_ID, location=REGION, credentials=creds)

pipeline_job = aiplatform.PipelineJob(
    display_name="aero-alerts-scheduled-run",
    template_path="pipeline.json",
    pipeline_root=f"gs://{PROJECT_ID}-vertex-pipelines/aero-alerts",  # pick your bucket/path
    parameter_values={
        "project_id": PROJECT_ID,
        "dataset": "challenge5",
    },
    enable_caching=False,
)

# Hourly schedule
# If you want Pacific time: "CRON_TZ=America/Los_Angeles 0 * * * *"
schedule = pipeline_job.create_schedule(
    cron="0 * * * *",  # default timezone is UTC unless TZ/CRON_TZ prefix is used
    display_name="hourly_airport_alerts",
    max_run_count=24,
    # strongly recommended if your pipeline touches BQ / resources:
    # service_account="your-sa@your-project.iam.gserviceaccount.com",
    # max_concurrent_run_count=1,
    # allow_queueing=False,
)
print("Created schedule:", schedule.resource_name)


Created schedule: projects/603410550/locations/us-central1/schedules/5282542042998636544
