<a href="https://colab.research.google.com/github/brandonmoss124/mgmt467-analytics-portfolio/blob/main/lab8_streaming_pipeline_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Setup: install Pub/Sub client library in Colab
!pip install -q google-cloud-pubsub functions-framework

from google.colab import auth  # comment out if not using Colab
auth.authenticate_user()


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/321.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m321.3/321.3 kB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/41.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.4/41.4 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/55.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.8/55.8 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/85.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.0/85.0 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25h

# Lab 8: Building a Streaming Pipeline (Pub/Sub + Dataflow)

This notebook documents your steps to convert the batch pipeline from Lab 7
into a streaming pipeline using Pub/Sub and a Dataflow template.

Use this as a structured template; you still need to perform the steps
in the GCP console and customize resource names.

## 1. Pub/Sub Topic Setup

In the GCP Console, create a topic named `live-data-stream`.

**TODO:** Document the exact topic path here (e.g., `projects/your-project/topics/live-data-stream`).

## 2. Modify Cloud Function to Publish to Pub/Sub

Update your Lab 7 Cloud Function so that instead of writing directly to
BigQuery, it publishes the weather JSON to the Pub/Sub topic.

In [3]:
from google.cloud import pubsub_v1
import json
import os
import logging
import functions_framework

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# TODO: update this to your actual project ID and topic name
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "proven-agility-477721-q9")
TOPIC_ID = "live-data-stream"

publisher = pubsub_v1.PublisherClient()
TOPIC_PATH = publisher.topic_path(PROJECT_ID, TOPIC_ID)

def publish_weather(weather_dict):
    """Publish a weather dict as JSON to Pub/Sub."""
    data_str = json.dumps(weather_dict)
    future = publisher.publish(TOPIC_PATH, data=data_str.encode("utf-8"))
    message_id = future.result()
    logger.info(f"Published message ID: {message_id}")

@functions_framework.http
def ingest_weather_producer(request):
    """HTTP Cloud Function that fetches weather and publishes to Pub/Sub.

    NOTE: This expects you to have defined `fetch_weather_lafayette()`
    (from Lab 7) in the same source file.
    """
    weather_json = fetch_weather_lafayette()
    publish_weather(weather_json)
    return ("OK: message published", 200)

## 3. Dataflow Job from Template

In the GCP Console:
1. Go to **Dataflow → Create job from template**.
2. Select the template **Pub/Sub Topic to BigQuery**.
3. Configure parameters:
   * **Input topic**: your `live-data-stream` topic.
   * **Output table**: a new table in your dataset, e.g.,
     `your-project.superstore_data.realtime_weather_streaming`.
   * **Temp location**: a folder in a GCS bucket, e.g.,
     `gs://your-bucket/dataflow-temp/`.

**TODO:** After you launch the job, include a screenshot of the running graph in your lab submission.

## 4. Validate Streaming Data in BigQuery

Trigger your Cloud Function manually (via HTTP), wait for Dataflow to process,
and then query your output table. Example query (adapt to your table name):

```sql
SELECT *
FROM `your-project.superstore_data.realtime_weather_streaming`
ORDER BY dt_utc DESC
LIMIT 10;
```

**TODO:** Capture a screenshot of the query results showing streamed rows.

## 5. Challenge: Apache Beam Concepts Prompt

Author a prompt to ask Gemini to explain the three core concepts of Apache Beam
(`Pipeline`, `PCollection`, and `ParDo`) using an analogy (e.g., an assembly line).

Record your prompt and Gemini's response here.

Using a factory assembly line as an analogy, explain the three core concepts of Apache Beam: Pipeline, PCollection, and ParDo — what each one represents and how they work together.
Imagine Apache Beam as a factory assembly line that processes products:

Pipeline The conveyor belt / full assembly line
It defines the entire workflow and the order of operations — where the data starts and how it moves.

PCollection The items on the belt
These represent the data elements being processed, like individual products traveling through the factory.

ParDo The workers at each station
Each ParDo performs a specific task on every item it receives — such as cutting, painting, labeling, etc.

Together, the Pipeline path, PCollections items, and ParDo workers enable data to move through a well-structured process, transforming raw data into useful output — just like a real production line.