# Airport Weather Alerts with Gemini

This notebook demonstrates the power of Gemini AI to generate intelligent weather alerts for airports. 

1. **Fetch real-time weather data** from the National Weather Service API for US airports
2. **Store structured data** in BigQuery tables
3. **Use Gemini 2.5 Flash** to analyze weather forecasts and generate actionable alerts
4. **Extract insights** from unstructured weather data


In [None]:
import pandas as pd

# Read the file
df = pd.read_csv('gs://labs.roitraining.com/data-to-ai-workshop/airports.csv')

# Pull the data schema and data types to determine the required table schema.
print(df.dtypes)


In [None]:
# Import the file into a BigQuery table

from google.cloud import bigquery

client = bigquery.Client()

# Set dataset reference vars.
project_id = "qwiklabs-gcp-02-949c0486d822"
dataset_id = "dani_data_to_ai_workshop"
table_id = "airports"

dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
dataset = bigquery.Dataset(dataset_ref)
client.create_dataset(dataset, exists_ok=True)

# Define the schema
schema = [
    bigquery.SchemaField("id", "INTEGER"),
    bigquery.SchemaField("ident", "STRING"),
    bigquery.SchemaField("type", "STRING"),
    bigquery.SchemaField("name", "STRING"),
    bigquery.SchemaField("latitude_deg", "FLOAT"),
    bigquery.SchemaField("longitude_deg", "FLOAT"),
    bigquery.SchemaField("elevation_ft", "FLOAT"),
    bigquery.SchemaField("continent", "STRING"),
    bigquery.SchemaField("iso_country", "STRING"),
    bigquery.SchemaField("iso_region", "STRING"),
    bigquery.SchemaField("municipality", "STRING"),
    bigquery.SchemaField("scheduled_service", "STRING"),
    bigquery.SchemaField("icao_code", "STRING"),
    bigquery.SchemaField("iata_code", "STRING"),
    bigquery.SchemaField("gps_code", "STRING"),
    bigquery.SchemaField("local_code", "STRING"),
    bigquery.SchemaField("home_link", "STRING"),
    bigquery.SchemaField("wikipedia_link", "STRING"),
    bigquery.SchemaField("keywords", "STRING"),
]

# Create the table if it doesn't exist.
table_ref = bigquery.TableReference(dataset_ref, table_id)
table = bigquery.Table(table_ref, schema=schema)
client.create_table(table, exists_ok=True)

# Load the data into the table.
job_config = bigquery.LoadJobConfig(
    schema=schema,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,  # Skip the header row
)

job = client.load_table_from_uri(
    'gs://labs.roitraining.com/data-to-ai-workshop/airports.csv',
    table,
    location='US',
    job_config=job_config
)

try:
    job.result()  # Wait for the job to complete
    table = client.get_table(table_ref)
    print(f"Loaded {table.num_rows} rows and {len(table.schema)} columns to {table_id}")
except Exception as e:
    print(f"Error loading data: {e}")


# Validate the table was created.
table = client.get_table(table_ref)
print(f"Table {table_id} created with {table.num_rows} rows and {len(table.schema)} columns")


In [None]:
# Fetch weather forecasts for all large airports in the US using the National Weather Service API

import pandas as pd
import requests
import time
from datetime import datetime
import json

# First, let's query our BigQuery table to get all large airports in the US
from google.cloud import bigquery

client = bigquery.Client()

# Query to get all large airports in the US
query = f"""
SELECT id, ident, name, latitude_deg, longitude_deg, municipality
FROM `{project_id}.{dataset_id}.airports`
WHERE iso_country = 'US' 
AND type = 'large_airport'
"""

# Run the query
query_job = client.query(query)
results = query_job.result()

# Convert to DataFrame
large_airports_df = pd.DataFrame(
    [(row.id, row.ident, row.name, row.latitude_deg, row.longitude_deg, row.municipality) 
     for row in results],
    columns=['id', 'ident', 'name', 'latitude_deg', 'longitude_deg', 'municipality']
)

print(f"Found {len(large_airports_df)} large airports in the US")
print(large_airports_df.head())


In [None]:
# Function to get weather forecast from NWS API
def get_weather_forecast(lat, lon):
    """
    Get weather forecast from the National Weather Service API for a specific latitude/longitude
    """
    # Step 1: Get the grid endpoint for the location
    points_url = f"https://api.weather.gov/points/{lat},{lon}"
    
    # Set headers for API request (NWS API requires a user agent)
    headers = {
        'User-Agent': 'BigQuery Weather Integration (dpalencia@burwood.com)',
        'Accept': 'application/geo+json'
    }
    
    try:
        # Get the points data
        points_response = requests.get(points_url, headers=headers)
        points_response.raise_for_status()  # Raise exception for HTTP errors
        points_data = points_response.json()
        
        # Extract the forecast URL
        forecast_url = points_data['properties']['forecast']
        
        # Get the forecast data
        forecast_response = requests.get(forecast_url, headers=headers)
        forecast_response.raise_for_status()
        forecast_data = forecast_response.json()
        
        # Return the forecast periods
        return forecast_data['properties']['periods']
    
    except requests.exceptions.RequestException as e:
        print(f"Error fetching forecast: {e}")
        return None
    except (KeyError, ValueError) as e:
        print(f"Error parsing forecast data: {e}")
        return None

# Function to process forecast data into a structured format
def process_forecast(forecast_periods, airport_id, airport_name):
    """
    Process forecast periods into a structured format for BigQuery
    """
    if not forecast_periods:
        return []
    
    processed_forecasts = []
    
    for period in forecast_periods:
        forecast_entry = {
            'airport_id': airport_id,
            'airport_name': airport_name,
            'forecast_time': datetime.now().isoformat(),
            'period_name': period.get('name', ''),
            'period_start': period.get('startTime', ''),
            'period_end': period.get('endTime', ''),
            'temperature': period.get('temperature', None),
            'temperature_unit': period.get('temperatureUnit', ''),
            'wind_speed': period.get('windSpeed', ''),
            'wind_direction': period.get('windDirection', ''),
            'short_forecast': period.get('shortForecast', ''),
            'detailed_forecast': period.get('detailedForecast', '')
        }
        processed_forecasts.append(forecast_entry)
    
    return processed_forecasts


In [None]:
# Fetch weather forecasts for all large airports
all_forecasts = []

# Add a rate limiter to respect the API's limits
for index, airport in large_airports_df.iterrows():
    print(f"Fetching forecast for {airport['name']} ({airport['ident']})")
    
    # Get the forecast
    forecast_periods = get_weather_forecast(airport['latitude_deg'], airport['longitude_deg'])
    
    # Process the forecast if we got data
    if forecast_periods:
        processed_forecasts = process_forecast(forecast_periods, airport['id'], airport['name'])
        all_forecasts.extend(processed_forecasts)
        print(f"  Got {len(processed_forecasts)} forecast periods")
    else:
        print(f"  No forecast data available")
    
    # Sleep to respect rate limits
    time.sleep(1.5)  # Sleep for 1.5 seconds between requests

# Convert all forecasts to a DataFrame
forecasts_df = pd.DataFrame(all_forecasts)

print(f"\nCollected {len(forecasts_df)} forecast periods for {len(large_airports_df)} airports")
print(forecasts_df.head())


In [None]:
# Create a BigQuery table for the weather forecasts

# Define the schema for the airport_weather_forecasts table
forecast_schema = [
    bigquery.SchemaField("airport_id", "INTEGER"),
    bigquery.SchemaField("airport_name", "STRING"),
    bigquery.SchemaField("forecast_time", "STRING"),
    bigquery.SchemaField("period_name", "STRING"),
    bigquery.SchemaField("period_start", "STRING"),
    bigquery.SchemaField("period_end", "STRING"),
    bigquery.SchemaField("temperature", "INTEGER"),
    bigquery.SchemaField("temperature_unit", "STRING"),
    bigquery.SchemaField("wind_speed", "STRING"),
    bigquery.SchemaField("wind_direction", "STRING"),
    bigquery.SchemaField("short_forecast", "STRING"),
    bigquery.SchemaField("detailed_forecast", "STRING"),
]

# Create a new table for the forecasts
forecast_table_id = "airport_weather_forecasts"
forecast_table_ref = bigquery.TableReference(dataset_ref, forecast_table_id)
forecast_table = bigquery.Table(forecast_table_ref, schema=forecast_schema)

# Create the table (or replace if it exists)
client.create_table(forecast_table, exists_ok=True)
print(f"Created table {project_id}.{dataset_id}.{forecast_table_id}")

# Convert DataFrame to format suitable for BigQuery
# Keep the datetime columns as strings to match the schema
# No conversion needed as they should already be strings

# Load the data into BigQuery
job_config = bigquery.LoadJobConfig(
    schema=forecast_schema,
    write_disposition="WRITE_TRUNCATE",  # Replace any existing data
)

job = client.load_table_from_dataframe(
    forecasts_df, 
    forecast_table_ref,
    job_config=job_config,
    location="US"
)

try:
    job.result()  # Wait for the job to complete
    forecast_table = client.get_table(forecast_table_ref)
    print(f"Loaded {forecast_table.num_rows} rows and {len(forecast_table.schema)} columns to {forecast_table_id}")
except Exception as e:
    print(f"Error loading data: {e}")

# Validate the table was created successfully
forecast_table = client.get_table(forecast_table_ref)
print(f"Table {forecast_table_id} created with {forecast_table.num_rows} rows and {len(forecast_table.schema)} columns")


In [None]:
# Query the data to verify it was loaded correctly

# Sample query to get the latest forecasts for each airport
query = f"""
SELECT 
    airport_name,
    period_name,
    temperature,
    temperature_unit,
    wind_speed,
    wind_direction,
    short_forecast
FROM 
    `{project_id}.{dataset_id}.{forecast_table_id}`
ORDER BY 
    airport_name,
    period_start
LIMIT 20
"""

# Run the query
query_job = client.query(query)
results = query_job.result()

# Display the results
print("Sample of weather forecast data:")
for row in results:
    print(f"{row.airport_name} - {row.period_name}: {row.temperature}°{row.temperature_unit}, {row.wind_speed} {row.wind_direction}, {row.short_forecast}")


In [None]:
# Create a Gemini model in BigQuery using REMOTE WITH CONNECTION

# Set the connection name for Vertex AI
connection_name = "us-central1.vertex-ai"

# SQL query to create a Gemini model using REMOTE WITH CONNECTION
create_model_query = f"""
CREATE OR REPLACE MODEL `{project_id}.{dataset_id}.airport_gemini_model`
REMOTE WITH CONNECTION `{project_id}.{connection_name}`
OPTIONS (
  endpoint = 'gemini-2.5-flash'
)
"""

# Execute the CREATE MODEL query
try:
    job = client.query(create_model_query, location="US")
    job.result()  # Wait for the model to be created
    print(f"Successfully created model: {project_id}.{dataset_id}.airport_gemini_model")
except Exception as e:
    print(f"Error creating Gemini model: {e}")


In [None]:
# Generate airport weather alerts using Gemini

# First, let's create a query that joins airport data with weather forecasts
# and prepares prompts for Gemini to generate alerts
alert_query = f"""
WITH airport_forecasts AS (
  SELECT
    a.id AS airport_id,
    a.name AS airport_name,
    a.municipality,
    a.iso_region,
    f.period_name,
    f.temperature,
    f.temperature_unit,
    f.wind_speed,
    f.wind_direction,
    f.short_forecast,
    f.detailed_forecast
  FROM
    `{project_id}.{dataset_id}.airports` a
  JOIN
    `{project_id}.{dataset_id}.{forecast_table_id}` f
  ON
    a.id = f.airport_id
  WHERE
    a.iso_country = 'US'
    AND a.type = 'large_airport'
  ORDER BY
    a.name, f.period_start
),
airport_forecast_prompts AS (
  SELECT
    airport_id,
    airport_name,
    municipality,
    iso_region,
    CONCAT(
      'Based on the following weather forecast for ', airport_name, ' in ', municipality, ', ',
      STRING_AGG(
        CONCAT(
          period_name, ': ', temperature, '°', temperature_unit,
          ', wind ', wind_speed, ' ', wind_direction,
          ', ', short_forecast
        ),
        '. '
      ),
      '. Generate a concise list of potential weather alerts, warnings, or hazardous conditions that travelers should be aware of. ',
      'Include severity levels (Low/Medium/High) and recommended precautions. ',
      'Format the response as a structured list with bullet points for each alert. ',
      'If there are no significant alerts, state that conditions appear normal.'
    ) AS alert_prompt
  FROM
    airport_forecasts
  GROUP BY
    airport_id, airport_name, municipality, iso_region
  LIMIT 10  -- Limiting to 10 airports for testing, remove this for production
)
SELECT
  *
FROM
  airport_forecast_prompts
"""

# Execute the query to get the prompts
alert_job = client.query(alert_query, location="US")
alert_results = alert_job.result()

# Convert to a DataFrame
alert_prompts_df = pd.DataFrame(
    [(row.airport_id, row.airport_name, row.municipality, row.iso_region, row.alert_prompt) 
     for row in alert_results],
    columns=['airport_id', 'airport_name', 'municipality', 'iso_region', 'alert_prompt']
)

print(f"Generated prompts for {len(alert_prompts_df)} airports")

# Sample a prompt to see what it looks like
if not alert_prompts_df.empty:
    sample_row = alert_prompts_df.iloc[0]
    print("\n=== SAMPLE PROMPT ===")
    print(f"Airport: {sample_row.airport_name}, Location: {sample_row.municipality}, {sample_row.iso_region}")
    print("\nPrompt (truncated):")
    print(sample_row.alert_prompt[:500] + "..." if len(sample_row.alert_prompt) > 500 else sample_row.alert_prompt)


In [None]:
# Use Gemini to generate alerts for each airport

# Create a new table for airport alerts
alerts_table_id = "airport_weather_alerts"

# First, check if the alerts table already exists and delete it if it does
alerts_table_ref = bigquery.TableReference(dataset_ref, alerts_table_id)

try:
    client.get_table(alerts_table_ref)
    client.delete_table(alerts_table_ref)
    print(f"Deleted existing {alerts_table_id} table")
except Exception:
    print(f"Table {alerts_table_id} does not exist yet, will create it")

# Create the alerts table with ML.GENERATE_TEXT in the query
alerts_query = f"""
CREATE OR REPLACE TABLE `{project_id}.{dataset_id}.{alerts_table_id}` AS
WITH airport_forecast_prompts AS (
  SELECT
    a.id AS airport_id,
    a.name AS airport_name,
    a.municipality,
    a.iso_region,
    CONCAT(
      'Based on the following weather forecast for ', a.name, ' in ', a.municipality, ', ',
      STRING_AGG(
        CONCAT(
          f.period_name, ': ', f.temperature, '°', f.temperature_unit,
          ', wind ', f.wind_speed, ' ', f.wind_direction,
          ', ', f.short_forecast
        ),
        '. '
      ),
      '. Generate a concise list of potential weather alerts, warnings, or hazardous conditions that travelers should be aware of. ',
      'Make the response concise; only a few sentences. ',
      'If there are no significant alerts, state that conditions appear normal.'
    ) AS alert_prompt
  FROM
    `{project_id}.{dataset_id}.airports` a
  JOIN
    `{project_id}.{dataset_id}.{forecast_table_id}` f
  ON
    a.id = f.airport_id
  WHERE
    a.iso_country = 'US'
    AND a.type = 'large_airport'
  GROUP BY
    a.id, a.name, a.municipality, a.iso_region
  LIMIT 10  -- Limiting to 10 airports for testing, remove this for production
)
SELECT
  p.airport_id,
  p.airport_name,
  p.municipality,
  p.iso_region,
  CURRENT_TIMESTAMP() AS generated_at,
  p.alert_prompt,
  (SELECT ml_generate_text_result FROM ML.GENERATE_TEXT(
    MODEL `{project_id}.{dataset_id}.airport_gemini_model`,
    (SELECT p.alert_prompt AS prompt),
    STRUCT(0.2 AS temperature, 2048 AS max_output_tokens)
  )) AS alert_response,
  -- Extract text directly from the model response
  JSON_VALUE((SELECT ml_generate_text_result FROM ML.GENERATE_TEXT(
    MODEL `{project_id}.{dataset_id}.airport_gemini_model`,
    (SELECT p.alert_prompt AS prompt),
    STRUCT(0.2 AS temperature, 2048 AS max_output_tokens)
  )), '$.candidates[0].content.parts[0].text') AS alert_text
FROM
  airport_forecast_prompts p
"""

# Execute the query to create the alerts table
try:
    create_job = client.query(alerts_query, location="US")
    create_job.result()
    print("Successfully created airport weather alerts table")
except Exception as e:
    print(f"Error creating alerts table: {str(e)}")


In [None]:
# Query the alerts table to verify the results

# Sample query to get the alerts for each airport
query = f"""
SELECT 
    airport_name,
    municipality,
    iso_region,
    generated_at,
    alert_text
FROM 
    `{project_id}.{dataset_id}.{alerts_table_id}`
ORDER BY 
    airport_name
"""

# Run the query
query_job = client.query(query)
results = query_job.result()

# Display the results
print("Airport Weather Alerts Generated by Gemini:\n")
for row in results:
    print(f"=== {row.airport_name} ({row.municipality}, {row.iso_region}) ===")
    print(f"Generated at: {row.generated_at}")
    print("\nAlerts:")
    print(row.alert_text)
    print("\n" + "-"*80 + "\n")
