# Project Plan: From Historical Analysis to Real-Time Flight Predictions

This plan is broken down into the three parts you've outlined. Each part will be a section in the final Colab notebook, with clear instructions and code cells.

# **Part I: Batch Processing of Historical Flight Data**

Objective: To build a foundational understanding of data engineering and machine learning using a large, static dataset.

Cell-by-Cell Plan:

Setup and Authentication:

Install necessary libraries (google-cloud-storage, google-cloud-bigquery, pandas, db-dtypes).

Authenticate the user and configure the GCP project.

Define all necessary variables (bucket names, dataset names, etc.).

Data Acquisition (Downloading 2024 Flight Data):

Provide a function that programmatically downloads the 2024 On-Time Performance data directly from the BTS.gov website. We will select a few months to keep the dataset manageable.

The code will download the zipped CSV files and then unzip them.

Move Data to GCS Bucket:

Create a new GCS bucket.

Upload the downloaded and unzipped CSV files to this bucket. This establishes our "data lake."

Load Data into a BigQuery Table:

Create a new BigQuery dataset.

Use a BigQuery Load Job to load all the CSV files from the GCS bucket into a single BigQuery table. We will use schema auto-detection.

Data Cleaning and Feature Engineering in BigQuery:

Run SQL queries to inspect the data, handle nulls, and create new features. A key feature will be creating a boolean label for logistic regression, such as is_arrival_delayed (TRUE if ARR_DELAY > 15 minutes).

Build a Linear Regression Model:

Use BigQuery ML to create a linear regression model to predict a continuous variable.

Goal: Predict ARR_DELAY based on features like DEP_DELAY, CARRIER, DISTANCE, and DAY_OF_WEEK.

Include cells to create, evaluate, and make predictions with the model.

Build a Logistic Regression Model:

Use BigQuery ML to create a logistic regression model for classification.

Goal: Predict the is_arrival_delayed boolean label we created earlier.

Include cells to create, evaluate (checking precision/recall), and make predictions.

Build a K-Means Clustering Model:

Use BigQuery ML to create a K-Means clustering model to segment the data.

Goal: Group flights into clusters based on their characteristics (e.g., "long-haul, often delayed," "short-haul, on-time").

Include cells to create the model and analyze the resulting cluster centroids to understand their meaning.

# **Part II: Micro-Batch Processing of "Live" Data**

Objective: To simulate a streaming environment where new data arrives periodically and build a pipeline to process it.

Cell-by-Cell Plan:

Introduction to Streaming Concepts:

Explain the difference between batch and streaming. Introduce the OpenSky Network API as our source for live flight data.

Build a Cloud Function for Data Ingestion (GCS Trigger):

Provide the Python code for a Google Cloud Function.

Function's Job: This function will be triggered on a schedule (e.g., every 15 minutes using Cloud Scheduler). It will call the OpenSky Network API, fetch all current flight vectors, format the data as a JSON file, and save it to a new GCS bucket (our "streaming landing zone").

Set up the Pub/Sub and BigQuery Streaming Infrastructure:

Create a Pub/Sub topic that will receive notifications about new files.

Configure the GCS bucket to send a message to this Pub/Sub topic every time a new file is created.

Create a new BigQuery table to hold the streaming data.

Build a Dataflow Pipeline (Pub/Sub to BigQuery):

Provide the code for a second Cloud Function (or a Dataflow job) that is triggered by messages on the Pub/Sub topic.

Function's Job: When triggered, it will read the new JSON file from GCS, parse it, and stream the data into the new BigQuery table.

Build ML Models on the Streaming Data:

Re-run the same BigQuery ML CREATE MODEL statements from Part I, but this time, train them on the new BigQuery table that is being populated with the streaming data. This demonstrates how models can be updated with fresh data.

## **Part III: Real-Time Prediction**

Objective: To build a true real-time pipeline that makes predictions on individual events as they happen.

Cell-by-Cell Plan:

Architecting for Real-Time:

Explain why for true real-time, we want to bypass saving files to GCS and send data directly to a message queue.

Build a Cloud Function for Direct Streaming (Pub/Sub):

Provide the code for a new, optimized Cloud Function.

Function's Job: This function will be triggered by a scheduler. It will fetch data from the OpenSky API and publish each flight's data as a separate message directly to a new Pub/Sub topic.

Real-Time Prediction Pipeline:

Provide the code for a final Cloud Function that acts as our real-time prediction engine.

Function's Job: This function will be a subscriber to the Pub/Sub topic from the previous step. For each message (each flight) it receives, it will immediately call the ML.PREDICT function using the models developed in Part II.

The prediction results (e.g., "this flight is likely to be delayed") can then be saved to a separate "predictions" table in BigQuery.

Visualizing Real-Time Results:

Include a final set of BigQuery queries that students can run to see the predictions populating the new table in near real-time.

In [7]:
# @title ### Cell 1: Setup, Authentication, and Configuration (Corrected)
# @markdown **Objective:** This cell imports all necessary Python libraries, authenticates your Google account to allow access to your GCP project, and sets up key configuration variables.

# ---
# **Libraries Explained:**
# - `os`: For interacting with the operating system, like removing files.
# - `subprocess`: Allows us to run shell commands like `gcloud`.
# - `google.colab.auth`: A specific Colab library to handle authentication with your Google account.
# ---

import os
import subprocess
from google.colab import auth

def setup_environment():
    """
    Authenticates the user and configures the necessary GCP project.

    This function handles the initial setup by authenticating the user's Google account
    for use within the Colab environment. It then programmatically determines the
    current GCP Project ID. If it cannot be determined automatically, it will prompt
    the user to enter it manually.

    Returns:
        str: The Project ID.
               Returns None if the project ID is not provided.
    """
    print("Authenticating your Google account...")
    auth.authenticate_user()
    print("‚úÖ Authentication successful.")

    project_id = ""
    try:
        project_id_process = subprocess.run(
            ["gcloud", "config", "get-value", "project"],
            capture_output=True, text=True, check=True
        )
        project_id = project_id_process.stdout.strip()
    except (subprocess.CalledProcessError, FileNotFoundError):
        pass

    if not project_id:
        print("‚ö†Ô∏è Could not automatically determine GCP Project ID.")
        project_id = input("Please enter your GCP Project ID: ")

    if not project_id:
        print("üî¥ ERROR: Project ID is required to continue. Halting execution.")
        return None

    print(f"‚úÖ Using GCP Project: {project_id}")
    return project_id

# Run the setup function and store the variables.
PROJECT_ID = setup_environment()


Authenticating your Google account...
‚úÖ Authentication successful.
‚ö†Ô∏è Could not automatically determine GCP Project ID.
Please enter your GCP Project ID: mgmt-467-25259
‚úÖ Using GCP Project: mgmt-467-25259


In [2]:
!pip install functions-framework google-cloud-pubsub

import functions_framework
from google.cloud import pubsub_v1
import json
import os
import requests # Make sure requests is imported

# OpenSky Network API configuration
OPENSKY_USERNAME = os.environ.get("OPENSKY_USERNAME")
OPENSKY_PASSWORD = os.environ.get("OPENSKY_PASSWORD")

# --- User-defined Pub/Sub Topic and Project ID ---
PUBSUB_TOPIC_NAME = "live-data-stream" # Updated to user's specified topic
PROJECT_ID = os.environ.get("GCP_PROJECT") # Cloud Functions automatically set this env var

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_NAME)

@functions_framework.http
def publish_flight_data(request):
    """
    HTTP Cloud Function that fetches live flight data and publishes it to Pub/Sub.

    This function fetches live flight data from the OpenSky Network API
    and then publishes each flight record as a JSON message to a Pub/Sub topic.
    It replaces a hypothetical direct BigQuery insertion with Pub/Sub publishing.
    """
    if not PROJECT_ID or not PUBSUB_TOPIC_NAME:
        print("Error: PROJECT_ID or PUBSUB_TOPIC_NAME not set.")
        return ('Configuration error', 500)

    if not OPENSKY_USERNAME or not OPENSKY_PASSWORD:
        print("Error: OPENSKY_USERNAME or OPENSKY_PASSWORD environment variables not set. Cannot fetch from OpenSky.")
        return ('Configuration error: OpenSky credentials missing', 500)

    print(f"Fetching flight data from OpenSky and publishing to {PUBSUB_TOPIC_NAME}...")

    flight_data_list = []
    try:
        # Actual OpenSky API call
        response = requests.get(
            f"https://{OPENSKY_USERNAME}:{OPENSKY_PASSWORD}@opensky-network.org/api/states/all",
            timeout=30
        )
        response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)
        data = response.json()
        raw_states = data.get("states", [])

        for state in raw_states:
            # OpenSky state vector elements mapping:
            # 0: icao24 (string) - Unique ICAO 24-bit address
            # 1: callsign (string) - Callsign of the vehicle
            # 2: origin_country (string) - Country name inferred by ICAO 24-bit address.
            # 3: time_position (int) - Unix timestamp (seconds) for the last position update. Can be null.
            # 5: longitude (float) - In decimal degrees (WGS-84)
            # 6: latitude (float) - In decimal degrees (WGS-84)
            # 9: velocity (float) - Velocity over ground in m/s
            # 13: baro_altitude (float) - Barometric altitude in metres. Can be null.

            # Filter for valid flight data with position
            if state and state[5] is not None and state[6] is not None:
                flight_record = {
                    "aircraft_id": state[0],
                    "callsign": state[1].strip() if state[1] else None, # Clean callsign
                    "origin_country": state[2],
                    "timestamp": state[3], # Unix timestamp
                    "longitude": state[5],
                    "latitude": state[6],
                    "velocity_m_s": state[9],
                    "baro_altitude_m": state[13] # Include altitude
                }
                flight_data_list.append(flight_record)

    except requests.exceptions.RequestException as e:
        print(f"Error fetching OpenSky data: {e}")
        return (f'Failed to fetch flight data: {e}', 500)
    except Exception as e:
        print(f"An unexpected error occurred during data fetching or parsing: {e}")
        return (f'Internal server error: {e}', 500)

    if not flight_data_list:
        print("No flight data received from OpenSky or no valid flights to process.")
        return ('No flight data to publish', 200)

    published_messages = []
    for flight_record in flight_data_list:
        try:
            # Data must be a bytestring
            data_bytes = json.dumps(flight_record).encode("utf-8")

            # Publishes the message
            future = publisher.publish(topic_path, data_bytes)
            message_id = future.result()
            published_messages.append(message_id)
            print(f"Published message for flight {flight_record.get('callsign', 'N/A')} ({flight_record.get('aircraft_id', 'N/A')}). ID: {message_id}")
        except Exception as e:
            print(f"Failed to publish message for flight {flight_record.get('callsign', 'N/A')}: {e}")

    print(f"Successfully published {len(published_messages)} messages to {PUBSUB_TOPIC_NAME}.")
    return (f'Successfully published {len(published_messages)} messages.', 200)



### Deployment Instructions for the Pub/Sub Publisher Cloud Function

This Python code, `publish_flight_data`, is designed to be deployed as a Google Cloud Function. It aligns with **Part III: Real-Time Prediction** of your project plan, specifically "Build a Cloud Function for Direct Streaming (Pub/Sub)".

**To deploy this function, you would typically follow these steps:**

1.  **Create a Pub/Sub Topic:** If you haven't already, create a Pub/Sub topic in your GCP project named `live-flight-data-stream` (or whatever you set `PUBSUB_TOPIC_NAME` to).
    ```bash
    gcloud pubsub topics create live-flight-data-stream
    ```
2.  **Save the Code:** Save the Python code above into a file named `main.py`.
3.  **Define Dependencies:** Create a `requirements.txt` file in the same directory with the following content:
    ```
    functions-framework
    google-cloud-pubsub
    requests # If you integrate OpenSky API fetching
    ```
4.  **Deploy the Cloud Function:** Use the `gcloud functions deploy` command. You'll need to specify:
    *   `--runtime python312` (or your preferred Python version)
    *   `--trigger-http` (as it's an HTTP-triggered function, which can then be invoked by Cloud Scheduler)
    *   `--entry-point publish_flight_data` (the name of the function to execute)
    *   `--region` (e.g., `us-central1`)
    *   `--allow-unauthenticated` (if you want to call it without authentication, common for scheduler triggers, but be mindful of security)
    *   `--set-env-vars` for `OPENSKY_USERNAME` and `OPENSKY_PASSWORD` if you integrate the OpenSky API.

    Example deployment command:
    ```bash
    gcloud functions deploy publish-flight-data-to-pubsub \
      --runtime python312 \
      --trigger-http \
      --entry-point publish_flight_data \
      --region us-central1 \
      --allow-unauthenticated # Or configure authentication for production
      # --set-env-vars OPENSKY_USERNAME=your_username,OPENSKY_PASSWORD=your_password
    ```

5.  **Schedule the Function:** Create a Cloud Scheduler job to invoke this HTTP Cloud Function periodically (e.g., every 15 minutes), as outlined in your plan.

In [8]:
# @title ### Cell 6: Create BigQuery Dataset
# @markdown **Objective:** This cell creates a new dataset in BigQuery to store our flight data. A dataset is a container for your tables, similar to a schema in a traditional database.

from google.cloud import bigquery
from google.cloud.exceptions import Conflict

# Initialize the BigQuery client
client = bigquery.Client(project=PROJECT_ID)

# Define the name for your new BigQuery dataset
BIGQUERY_DATASET = "flights_data"
dataset_id = f"{PROJECT_ID}.{BIGQUERY_DATASET}"

try:
    # Create a Dataset object
    dataset = bigquery.Dataset(dataset_id)
    # Specify the location for the dataset
    dataset.location = "US" # You can change this to your preferred location
    # Make an API request to create the dataset
    client.create_dataset(dataset, timeout=30)
    print(f"‚úÖ Successfully created dataset: {dataset_id}")
except Conflict:
    print(f"‚úÖ Dataset '{dataset_id}' already exists.")
except Exception as e:
    print(f"üî¥ An error occurred: {e}")

‚úÖ Dataset 'mgmt-467-25259.flights_data' already exists.


In [9]:
# @title ### Verify Streaming Data in BigQuery
# @markdown **Objective:** Run a query to check if data is being ingested into the `live_flight_states` table.

from google.cloud import bigquery

client = bigquery.Client(project=PROJECT_ID)

# Ensure table_id is correctly set (should be from cell `ea6f4895`)
streaming_table_id = f"{PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_STREAMING_TABLE}"

print(f"Checking for data in BigQuery table: {streaming_table_id}")

try:
    # Query to get the count of rows and a sample of the latest data
    query = f"""
    SELECT *
    FROM `{streaming_table_id}`
    ORDER BY timestamp DESC
    LIMIT 10
    """
    df_stream = client.query(query).to_dataframe()

    if not df_stream.empty:
        print(f"‚úÖ Successfully found {len(df_stream)} recent rows in {streaming_table_id}:")
        display(df_stream)

        count_query = f"SELECT COUNT(*) FROM `{streaming_table_id}`"
        count_job = client.query(count_query)
        total_rows = list(count_job.result())[0][0]
        print(f"Total rows in {streaming_table_id}: {total_rows}")
    else:
        print(f"‚ö†Ô∏è No data found in {streaming_table_id} yet. Ensure both Cloud Functions are deployed and running.")

except Exception as e:
    print(f"üî¥ An error occurred while querying the streaming table: {e}")
    print("Please ensure the BigQuery table exists and the Cloud Functions are correctly configured and deployed.")


Checking for data in BigQuery table: mgmt-467-25259.flights_data.live_flight_states
üî¥ An error occurred while querying the streaming table: 404 Not found: Table mgmt-467-25259:flights_data.live_flight_states was not found in location US; reason: notFound, message: Not found: Table mgmt-467-25259:flights_data.live_flight_states was not found in location US

Location: US
Job ID: c97cb76d-c6da-483b-b1bb-0e42c9880d38

Please ensure the BigQuery table exists and the Cloud Functions are correctly configured and deployed.


In [10]:
# @title ### Cell 8: Data Cleaning and Feature Engineering (Corrected)
# @markdown **Objective:** This cell runs a SQL query to create a new, cleaned view of our data. This view will serve as the basis for our machine learning models. We will create a key boolean label, `is_arrival_delayed`, for our classification model.

# Define the name for our new view
CLEANED_VIEW = "flights_cleaned"
view_id = f"{PROJECT_ID}.{BIGQUERY_DATASET}.{CLEANED_VIEW}"

# This SQL query selects relevant columns and creates our new feature.
# A VIEW is a virtual table based on the result-set of an SQL statement.
# It's a great way to create a clean dataset without duplicating data.
# --- FIX: Using the correct column names from the provided schema ---
sql_query = f"""
CREATE OR REPLACE VIEW `{view_id}` AS
SELECT
  -- Construct a DATE type from Year, Month, DayofMonth columns
  PARSE_DATE('%Y%m%d', CONCAT(CAST(Year AS STRING), LPAD(CAST(Month AS STRING), 2, '0'), LPAD(CAST(DayofMonth AS STRING), 2, '0'))) AS FL_DATE,
  IATA_CODE_Reporting_Airline AS CARRIER,
  Origin,
  Dest,
  DepDelay,
  ArrDelay,
  Distance,
  CAST(DayOfWeek AS STRING) AS DAY_OF_WEEK,
  -- Create a boolean label: TRUE if arrival delay is > 15 mins, FALSE otherwise.
  -- We also treat NULL delays as not delayed.
  CASE
    WHEN ArrDelay > 15 THEN TRUE
    ELSE FALSE
  END AS is_arrival_delayed
FROM
  `{table_id}`
WHERE
  -- Filter out cancelled and diverted flights for accurate modeling
  Cancelled = 0 AND Diverted = 0;
"""

try:
    # Execute the query to create the view
    query_job = client.query(sql_query)
    query_job.result() # Wait for the job to complete
    print(f"‚úÖ Successfully created cleaned view: {CLEANED_VIEW}")

    # Verify by showing the first 10 rows of the new view
    print("\n--- Sample of Cleaned Data ---")
    df = client.query(f"SELECT * FROM `{view_id}` LIMIT 10").to_dataframe()
    display(df)

except Exception as e:
    print(f"üî¥ An error occurred: {e}")

üî¥ An error occurred: 400 Invalid project ID 'None'. Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some project IDs also include domain name separated by a colon. IDs must start with a letter and may not end with a dash.; reason: invalid, location: None.flights_data.live_flight_states, message: Invalid project ID 'None'. Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some project IDs also include domain name separated by a colon. IDs must start with a letter and may not end with a dash.

Location: US
Job ID: e30baf1e-288c-4986-a632-f5d18ee62c6e



In [11]:
# @title ### Define BigQuery Streaming Table
# @markdown **Objective:** Create a new BigQuery table specifically designed to receive streaming flight data.
# This table will serve as the source for our machine learning models.

from google.cloud import bigquery
from google.cloud.exceptions import Conflict

client = bigquery.Client(project=PROJECT_ID)

# Define the name for our new streaming BigQuery table
BIGQUERY_STREAMING_TABLE = "live_flight_states"
table_id = f"{PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_STREAMING_TABLE}"

# Define the schema for the streaming table
# This schema matches the output of the 'publish_flight_data' Cloud Function
schema = [
    bigquery.SchemaField("aircraft_id", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("callsign", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("origin_country", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("timestamp", "INTEGER", mode="NULLABLE"), # Unix timestamp
    bigquery.SchemaField("longitude", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("latitude", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("velocity_m_s", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("baro_altitude_m", "FLOAT", mode="NULLABLE"),
]

table = bigquery.Table(table_id, schema=schema)

try:
    # Create the table
    table = client.create_table(table)
    print(f"‚úÖ Successfully created streaming BigQuery table: {table.project}.{table.dataset_id}.{table.table_id}")
except Conflict:
    print(f"‚úÖ Streaming BigQuery table '{table_id}' already exists.")
except Exception as e:
    print(f"üî¥ An error occurred while creating the streaming table: {e}")

# Set the global variable 'table_id' to point to this new streaming table
# This ensures subsequent ML cells use this table.
# It's important to re-run this cell if you restart the kernel,
# as the value of `table_id` is crucial for downstream cells.
print(f"'table_id' for downstream ML models is now set to: {table_id}")

‚úÖ Successfully created streaming BigQuery table: mgmt-467-25259.flights_data.live_flight_states
'table_id' for downstream ML models is now set to: mgmt-467-25259.flights_data.live_flight_states


In [12]:
import functions_framework
from google.cloud import bigquery
import json
import base64
import os

# --- User-defined BigQuery Dataset and Table --- (ensure these match your created resources)
PROJECT_ID = os.environ.get("GCP_PROJECT")
BIGQUERY_DATASET = "flights_data"
BIGQUERY_STREAMING_TABLE = "live_flight_states"

# Initialize BigQuery client outside the function to reuse across invocations
client = bigquery.Client()
table_id = f"{PROJECT_ID}.{BIGQUERY_DATASET}.{BIGQUERY_STREAMING_TABLE}"

@functions_framework.cloud_event
def subscribe_and_stream_to_bigquery(cloud_event):
    """
    Cloud Function triggered by a Pub/Sub message.
    It parses the message and streams the data into a BigQuery table.
    """
    print(f"Received CloudEvent: {cloud_event['id']} from {cloud_event['source']}")
    pubsub_message = cloud_event.data

    if not pubsub_message:
        print("No data in Pub/Sub message.")
        return

    if 'message' not in pubsub_message:
        print("Pub/Sub message structure is unexpected. Missing 'message' key.")
        return

    if 'data' not in pubsub_message['message']:
        print("No data payload found in Pub/Sub message.")
        return

    try:
        # Decode the Pub/Sub message data (which is base64 encoded)
        data = base64.b64decode(pubsub_message['message']['data']).decode('utf-8')
        row = json.loads(data)

        print(f"Attempting to stream row to BigQuery: {row}")

        # BigQuery expects a list of rows for insertion
        rows_to_insert = [row]

        # Stream the row into BigQuery
        errors = client.insert_rows_json(table_id, rows_to_insert)

        if errors:
            print(f"üî¥ Errors occurred while streaming to BigQuery: {errors}")
        else:
            print(f"‚úÖ Successfully streamed 1 row to {table_id}")

    except UnicodeDecodeError as e:
        print(f"üî¥ Error decoding base64 data: {e}")
        print(f"Raw message data: {pubsub_message['message']['data']}")
    except json.JSONDecodeError as e:
        print(f"üî¥ Error parsing JSON payload: {e}")
        print(f"Decoded string: {data}")
    except Exception as e:
        print(f"üî¥ An unexpected error occurred: {e}")


### Deployment Instructions for the Pub/Sub to BigQuery Subscriber Cloud Function

This Python code, `subscribe_and_stream_to_bigquery`, is designed to be deployed as a Google Cloud Function that reacts to Pub/Sub messages.

**To deploy this function, you would typically follow these steps:**

1.  **Ensure Pub/Sub Topic Exists:** Make sure your `live-data-stream` Pub/Sub topic exists.

2.  **Ensure BigQuery Table Exists:** Confirm that your `live_flight_states` BigQuery table exists with the correct schema (as defined in the previous step).

3.  **Save the Code:** Save the Python code above into a file named `main.py`.

4.  **Define Dependencies:** Create a `requirements.txt` file in the same directory with the following content:
    ```
    functions-framework
    google-cloud-bigquery
    ```

5.  **Deploy the Cloud Function:** Use the `gcloud functions deploy` command. You'll need to specify:
    *   `--runtime python312` (or your preferred Python version)
    *   `--trigger-topic live-data-stream` (this tells the function to subscribe to your Pub/Sub topic)
    *   `--entry-point subscribe_and_stream_to_bigquery` (the name of the function to execute)
    *   `--region` (e.g., `us-central1`)
    *   **Crucially, you need to ensure the Cloud Function's service account has permissions to write to BigQuery.** This typically means the `BigQuery Data Editor` role or a custom role with `bigquery.tables.insertAll` on the target table.

    Example deployment command:
    ```bash
    gcloud functions deploy pubsub-to-bigquery-streamer \
      --runtime python312 \
      --trigger-topic live-data-stream \
      --entry-point subscribe_and_stream_to_bigquery \
      --region us-central1
    ```

Once deployed, this function will automatically activate and ingest data into your BigQuery table whenever messages are sent to the `live-data-stream` Pub/Sub topic by your first Cloud Function (`publish_flight_data`).

In [13]:
# @title ### Cell 9: Build, Evaluate, and Predict with a Linear Regression Model
# @markdown **Objective:** Use BigQuery ML to create a linear regression model to predict the arrival delay (`ARR_DELAY`).

from google.cloud import bigquery
from google.cloud.exceptions import NotFound # Import NotFound for specific error handling

# Re-initialize the BigQuery client to ensure it's fresh and correctly scoped
client = bigquery.Client(project=PROJECT_ID)

# Re-define necessary variables as they might have been cleared
CLEANED_VIEW = "flights_cleaned"
view_id = f"{PROJECT_ID}.{BIGQUERY_DATASET}.{CLEANED_VIEW}"

print(f"Attempting to use view: {view_id}")

# --- Add a check for view existence before proceeding ---
try:
    client.get_table(view_id) # get_table works for views too
    print(f"‚úÖ BigQuery view '{view_id}' confirmed to exist.")
except NotFound:
    print(f"üî¥ ERROR: BigQuery view '{view_id}' not found. Please ensure Cell 8 (Data Cleaning and Feature Engineering) ran successfully and created the view.")
    raise # Re-raise the error to stop execution if the view is truly not found
except Exception as e:
    print(f"üî¥ An unexpected error occurred while checking view existence: {e}")
    raise

# --- 1. Create the Linear Regression Model ---
print("üöÄ Training Linear Regression model...")
create_linear_model_query = f"""
CREATE OR REPLACE MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.flight_delay_predictor`
OPTIONS(model_type='LINEAR_REG', input_label_cols=['ArrDelay']) AS
SELECT
  ArrDelay,
  DepDelay,
  CARRIER,
  Distance,
  DAY_OF_WEEK
FROM
  `{view_id}`;
"""
linear_job = client.query(create_linear_model_query)
linear_job.result()
print("‚úÖ Linear Regression model created successfully.")

# --- 2. Evaluate the Model ---
print("\n--- Model Evaluation ---")
evaluate_linear_model_query = f"""
SELECT * FROM ML.EVALUATE(MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.flight_delay_predictor`);
"""
linear_eval_df = client.query(evaluate_linear_model_query).to_dataframe()
display(linear_eval_df)

# --- 3. Make Predictions with the Model ---
print("\n--- Sample Predictions ---")
predict_linear_query = f"""
SELECT
  ArrDelay AS actual_delay,
  predicted_ArrDelay
FROM
  ML.PREDICT(MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.flight_delay_predictor`,
    (SELECT * FROM `{view_id}` LIMIT 10));
"""
linear_predict_df = client.query(predict_linear_query).to_dataframe()
display(linear_predict_df)

Attempting to use view: None.flights_data.flights_cleaned
üî¥ An unexpected error occurred while checking view existence: 400 GET https://bigquery.googleapis.com/bigquery/v2/projects/None/datasets/flights_data/tables/flights_cleaned?prettyPrint=false: Invalid resource name projects/None; Project id: None


BadRequest: 400 GET https://bigquery.googleapis.com/bigquery/v2/projects/None/datasets/flights_data/tables/flights_cleaned?prettyPrint=false: Invalid resource name projects/None; Project id: None

In [14]:
# @title ### Cell 10: Build, Evaluate, and Predict with a Logistic Regression Model
# @markdown **Objective:** Use BigQuery ML to create a logistic regression model to predict if a flight will be significantly delayed (`is_arrival_delayed`).

# --- 1. Create the Logistic Regression Model ---
print("üöÄ Training Logistic Regression model...")
create_logistic_model_query = f"""
CREATE OR REPLACE MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.flight_delay_classifier`
OPTIONS(model_type='LOGISTIC_REG', input_label_cols=['is_arrival_delayed']) AS
SELECT
  *
FROM
  `{view_id}`;
"""
logistic_job = client.query(create_logistic_model_query)
logistic_job.result()
print("‚úÖ Logistic Regression model created successfully.")

# --- 2. Evaluate the Model ---
print("\n--- Model Evaluation ---")
evaluate_logistic_model_query = f"""
SELECT * FROM ML.EVALUATE(MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.flight_delay_classifier`);
"""
logistic_eval_df = client.query(evaluate_logistic_model_query).to_dataframe()
display(logistic_eval_df)

# --- 3. Make Predictions with the Model ---
print("\n--- Sample Predictions ---")
# --- FIX: Changed 'prob' to the correct column name 'predicted_is_arrival_delayed_probs' ---
predict_logistic_query = f"""
SELECT
  is_arrival_delayed AS actual_is_delayed,
  predicted_is_arrival_delayed,
  predicted_is_arrival_delayed_probs
FROM
  ML.PREDICT(MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.flight_delay_classifier`,
    (SELECT * FROM `{view_id}` LIMIT 10))
"""
logistic_predict_df = client.query(predict_logistic_query).to_dataframe()
display(logistic_predict_df)

üöÄ Training Logistic Regression model...


NotFound: 404 POST https://bigquery.googleapis.com/bigquery/v2/projects//jobs?prettyPrint=false: Request couldn't be served.

Location: None
Job ID: fa92a489-1f4c-43a9-a5ef-bcc739ea1b53


In [15]:
# @title ### Cell 11: Build and Analyze a K-Means Clustering Model
# @markdown **Objective:** Use BigQuery ML's K-Means to group flights into distinct clusters based on their characteristics.

# --- 1. Create the K-Means Clustering Model ---
print("üöÄ Training K-Means Clustering model...")
create_kmeans_model_query = f"""
CREATE OR REPLACE MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.flight_clusterer`
OPTIONS(model_type='KMEANS', num_clusters=5) AS
SELECT
  DepDelay,
  ArrDelay,
  Distance
FROM
  `{view_id}`
WHERE DepDelay IS NOT NULL AND ArrDelay IS NOT NULL;
"""
kmeans_job = client.query(create_kmeans_model_query)
kmeans_job.result()
print("‚úÖ K-Means Clustering model created successfully.")

# --- 2. Analyze the Cluster Centroids ---
print("\n--- Cluster Centroid Analysis ---")
# ML.CENTROIDS shows the average values for each feature within each cluster.
# This helps us understand what defines each cluster (e.g., Cluster 1 is short-haul flights with minor delays).
analyze_centroids_query = f"""
SELECT
  *
FROM
  ML.CENTROIDS(MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.flight_clusterer`);
"""
centroids_df = client.query(analyze_centroids_query).to_dataframe()
display(centroids_df)

# --- 3. See which Cluster Each Flight Belongs To ---
print("\n--- Sample Cluster Assignments ---")
predict_kmeans_query = f"""
SELECT
  CENTROID_ID,
  DepDelay,
  ArrDelay,
  Distance
FROM
  ML.PREDICT(MODEL `{PROJECT_ID}.{BIGQUERY_DATASET}.flight_clusterer`,
    (SELECT * FROM `{view_id}` LIMIT 10));
"""
kmeans_predict_df = client.query(predict_kmeans_query).to_dataframe()
display(kmeans_predict_df)

üöÄ Training K-Means Clustering model...


NotFound: 404 POST https://bigquery.googleapis.com/bigquery/v2/projects//jobs?prettyPrint=false: Request couldn't be served.

Location: None
Job ID: 8251efdb-7c7b-466f-b438-f1d3db3c38e6
