Write an Apache Beam pipeline with the following requirements:

Write the raw data to files in Google Cloud Storage at regular intervals.
Parse the messages and write the data to BigQuery.
Calculate page views by minute. Create a dashboard that reports this information.
Run the pipeline in Dataflow.


In [None]:
# IMPORTANT: YOU NEED TO RUN THIS STEP ONLY ONCE 

#!pip install --quiet "apache-beam[gcp]" "google-cloud-pubsub" "notebook"
!pip install --upgrade apache-beam[gcp]
!pip install google-cloud-storage

In [None]:
# create a pubsub subscription to be used for the beam pipeline
# IMPORTANT: YOU NEED TO RUN THIS STEP ONLY ONCE 

gcloud pubsub subscriptions create dsl-clickstream-push-beam \
    --topic=dsl-project-clickstream

# this is a pull subscription, even though we are building a streaming pipeline
# dataflow will handle the processing of data as quickly as possible, using the proper IO pubsub components within Beam model

In [None]:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, DirectOptions
import logging
import time
from apache_beam.io import fileio
from apache_beam.transforms import window
from google.cloud import storage
import json
from datetime import datetime
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions

In [None]:
# --- Python Script to Launch Dataflow Job ---

# --- BIGQUERY TABLE SCHEMA ---
TABLE_SCHEMA = {
    'fields': [
        {'name': 'session_id', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'user_id', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'device_type', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'geolocation', 'type': 'GEOGRAPHY', 'mode': 'NULLABLE'},
        {'name': 'user_agent', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'visit_start_time', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
        {'name': 'visit_end_time', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
        {
            'name': 'events', 'type': 'RECORD', 'mode': 'REPEATED',
            'fields': [
                {'name': 'event_type', 'type': 'STRING', 'mode': 'NULLABLE'},
                {'name': 'event_timestamp', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
                {
                    'name': 'page_view', 'type': 'RECORD', 'mode': 'NULLABLE',
                    'fields': [
                        {'name': 'page_url', 'type': 'STRING', 'mode': 'NULLABLE'},
                        {'name': 'referrer_url', 'type': 'STRING', 'mode': 'NULLABLE'}
                    ]
                },
                {
                    'name': 'add_cart', 'type': 'RECORD', 'mode': 'NULLABLE',
                    'fields': [
                        {'name': 'product_id', 'type': 'STRING', 'mode': 'NULLABLE'},
                        {'name': 'product_name', 'type': 'STRING', 'mode': 'NULLABLE'},
                        {'name': 'category', 'type': 'STRING', 'mode': 'NULLABLE'},
                        {'name': 'price', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
                        {'name': 'quantity', 'type': 'INT64', 'mode': 'NULLABLE'}
                    ]
                },
                {
                    'name': 'purchase', 'type': 'RECORD', 'mode': 'NULLABLE',
                    'fields': [
                        {'name': 'order_id', 'type': 'STRING', 'mode': 'NULLABLE'},
                        {'name': 'amount', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
                        {'name': 'currency', 'type': 'STRING', 'mode': 'NULLABLE'},
                        {
                            'name': 'items', 'type': 'RECORD', 'mode': 'REPEATED',
                            'fields': [
                                {'name': 'product_id', 'type': 'STRING', 'mode': 'NULLABLE'},
                                {'name': 'product_name', 'type': 'STRING', 'mode': 'NULLABLE'},
                                {'name': 'category', 'type': 'STRING', 'mode': 'NULLABLE'},
                                {'name': 'price', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
                                {'name': 'quantity', 'type': 'INT64', 'mode': 'NULLABLE'}
                            ]
                        }
                    ]
                }
            ]
        }
    ]
}

In [None]:
# --- CUSTOM SINK FOR WRITING JSONL FILES TO GCS ---
    
class JsonlSink(fileio.FileSink):
    """
    JsonlSink defines the behavior for writing PCollection elements to a file in
    JSON Lines (JSONL) format. It inherits from fileio.FileSink and implements
    the open, write, and flush methods to handle file I/O.
    """
    """A sink that writes JSONL records to a file."""
    def __init__(self):
        # The sink needs a coder to encode the strings into bytes.
        self._coder = beam.coders.StrUtf8Coder()
        self._fh = None

    def open(self, fh):
        """This method is called when a new file is created for writing."""
        self._fh = fh

    def write(self, record):
        """
        This method is called for each element. It uses the file handle
        that was saved by the open() method.
        """
        if self._fh is None:
            raise RuntimeError('Sink open() was not called before write()')
        self._fh.write(self._coder.encode(record) + b'\n')

    def flush(self):
        """
        This method is called before a file is closed. It uses the file handle
        that was saved by the open() method.
        """
        if self._fh is not None:
            self._fh.flush()

In [None]:
# --- CUSTOM TRANSFORMATION LOGIC FOR BIGQUERY ---
def transform_json_to_bigquery_row(message_body: str) -> dict:
    data = json.loads(message_body)
    geolocation_wkt = None
    if 'geolocation' in data and data['geolocation']:
        try:
            lat, lon = map(float, data['geolocation'].split(','))
            geolocation_wkt = f"POINT({lon} {lat})"
        except (ValueError, TypeError):
            logging.warning(f"Invalid geolocation format '{data['geolocation']}', setting to NULL.")

    visit_start_time, visit_end_time = None, None
    if data.get('events'):
        valid_events = []
        for e in data['events']:
            if 'event' in e and 'timestamp' in e['event']:
                try:
                    datetime.fromisoformat(e['event']['timestamp'].replace('Z', '+00:00'))
                    valid_events.append(e)
                except (ValueError, TypeError):
                    logging.warning(f"Invalid timestamp format, skipping event.")
        if valid_events:
            sorted_events = sorted(
                valid_events,
                key=lambda x: datetime.fromisoformat(x['event']['timestamp'].replace('Z', '+00:00'))
            )
            visit_start_time = sorted_events[0]['event']['timestamp']
            visit_end_time = sorted_events[-1]['event']['timestamp']

    transformed_events = []
    for event_data in data.get('events', []):
        event_details = event_data.get('event', {})
        details_payload = event_details.get('details', {})
        event_type = event_details.get('event_type')
        page_view_struct, add_cart_struct, purchase_struct = None, None, None
        if event_type == 'page_view':
            page_view_struct = details_payload
        elif event_type == 'add_item_to_cart':
            add_cart_struct = details_payload
        elif event_type == 'purchase':
            purchase_struct = details_payload
        transformed_events.append({
            'event_type': event_type,
            'event_timestamp': event_details.get('timestamp'),
            'page_view': page_view_struct,
            'add_cart': add_cart_struct,
            'purchase': purchase_struct
        })

    return {
        'session_id': data.get('session_id'), 'user_id': data.get('user_id'),
        'device_type': data.get('device_type'), 'geolocation': geolocation_wkt,
        'user_agent': data.get('user_agent'), 'visit_start_time': visit_start_time,
        'visit_end_time': visit_end_time, 'events': transformed_events
    }

In [None]:
# --- PIPELINE DEFINITION ---
def define_and_run_pipeline(pipeline_options, subscription_path, table_spec, gcs_output_path):
    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Initial steps to read and decode messages are common to both branches
        messages = (
            pipeline
            | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=subscription_path).with_output_types(bytes)
            | 'Decode Messages' >> beam.Map(lambda msg_bytes: msg_bytes.decode('utf-8'))
        )

        # --- Branch 1: Transform and write to BigQuery ---
        (
            messages
            | 'Transform to BigQuery Row' >> beam.Map(transform_json_to_bigquery_row)
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                table=table_spec, schema=TABLE_SCHEMA,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS
            )
        )

        # --- Branch 2: Window and write raw JSON to Cloud Storage ---
        (
            messages
            | 'Window into 1 Minute Batches' >> beam.WindowInto(window.FixedWindows(60))
            | 'Write JSONL files to GCS' >> fileio.WriteToFiles(
                path=gcs_output_path,
                sink=JsonlSink(),
                file_naming=fileio.default_file_naming(prefix="visit", suffix=".jsonl")
            )
        )
    logging.info("Pipeline submitted to the runner.")

In [7]:
# --- MAIN EXECUTION BLOCK ---
logging.getLogger().setLevel(logging.INFO)

# --- 1. Set up project and resource details ---
PROJECT_ID = "jellyfish-training-demo-6"
SUBSCRIPTION_NAME = "dsl-clickstream-push-beam"
DATASET_ID = "dsl_project"
TABLE_ID = "web_visits"
REGION = 'us-central1'
GCS_BUCKET = "jellyfish-training-demo-6"
GCS_PATH = "dsl-project"

# --- 2. Configure pipeline options ---
JOB_NAME = f'streaming-multi-sink-{int(time.time())}'
TABLE_SPEC = f'{PROJECT_ID}:{DATASET_ID}.{TABLE_ID}'
GCS_OUTPUT_PREFIX = f"gs://{GCS_BUCKET}/{GCS_PATH}/jsonl_archive/" # Path for the JSONL files

options = PipelineOptions()
standard_options = options.view_as(StandardOptions)
standard_options.runner = 'DataflowRunner'
standard_options.streaming = True

google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = f"gs://{GCS_BUCKET}/{GCS_PATH}/staging"
google_cloud_options.temp_location = f"gs://{GCS_BUCKET}/{GCS_PATH}/temp"
google_cloud_options.region = REGION
options.view_as(SetupOptions).setup_file = './setup.py'

# --- 3. Run the pipeline ---
logging.info(f"--- Starting Dataflow Job: {JOB_NAME} ---")
logging.info(f"Streaming data to BigQuery table: {TABLE_SPEC}")
logging.info(f"Archiving raw data to GCS path: {GCS_OUTPUT_PREFIX}*")

define_and_run_pipeline(
    pipeline_options=options,
    subscription_path=f"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_NAME}",
    table_spec=TABLE_SPEC,
    gcs_output_path=GCS_OUTPUT_PREFIX
)

INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2025-06-24_07_26_01-8008251263561795308 is in state JOB_STATE_CANCELLING
INFO:apache_beam.runners.dataflow.dataflow_runner:2025-06-24T15:07:20.575Z: JOB_MESSAGE_BASIC: Cancel request is committed for workflow job: 2025-06-24_07_26_01-8008251263561795308.
INFO:apache_beam.runners.dataflow.dataflow_runner:2025-06-24T15:07:20.817Z: JOB_MESSAGE_BASIC: Stopping worker pool...
INFO:apache_beam.runners.dataflow.dataflow_runner:2025-06-24T15:07:20.872Z: JOB_MESSAGE_BASIC: Stopping worker pool...


KeyboardInterrupt: 

In [None]:
# Cell 2: Securely View Pipeline Output
# Run this cell a few minutes after the pipeline has started.



# --- You may need to install the library first ---
# !pip install google-cloud-storage

# --- Configuration (should match your pipeline) ---
GCS_BUCKET = "jellyfish-training-demo-6" 
GCS_PATH = "dsl-project/dataflow_run" # The output path for your data

# --- Use the Client Library to access GCS ---
# This uses your gcloud credentials automatically.
storage_client = storage.Client()
bucket = storage_client.bucket(GCS_BUCKET)

# List all files in the output directory
blobs = list(bucket.list_blobs(prefix=GCS_PATH))

if not blobs:
    print("No output files found yet. Please wait a few more minutes.")
    print(f"Searched in: gs://{GCS_BUCKET}/{GCS_PATH}")
else:
    # Find the most recently created file
    latest_blob = sorted(blobs, key=lambda b: b.time_created, reverse=True)[0]
    
    print(f"--- Displaying contents of the latest output file ---")
    print(f"File: gs://{GCS_BUCKET}/{latest_blob.name}")
    print(f"Created at: {latest_blob.time_created}")
    print("-" * 50)
    
    # Download and print the content
    content = latest_blob.download_as_text()
    
    # Print each JSON line
    for line in content.strip().split('\n'):
        # Pretty-print the JSON for readability
        parsed_json = json.loads(line)
        print(json.dumps(parsed_json, indent=2))
        
    print("-" * 50)
