In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

# Running a Pub/Sub to Pub/Sub Dataflow Pipeline with Data Redaction

This notebook guides you through setting up and running a streaming Dataflow
pipeline that reads JSON messages from a Pub/Sub topic, removes
specified fields (redaction), adds a processing timestamp, and writes the
processed messages to an output Pub/Sub topic. Messages that fail parsing
are routed to a dead-letter topic.

## Setup and Configuration
---
### Check project permissions

Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).

    1. In the Google Cloud console, on the Navigation menu (Navigation menu icon), select IAM & Admin > IAM.

    2. Confirm that the default compute Service Account {project-number}-compute@developer.gserviceaccount.com is present and has the editor role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud Overview > Dashboard.

If the account is not present in IAM or does not have the editor role, follow the steps below to assign the required role.

    1. In the Google Cloud console, on the Navigation menu, click Cloud Overview > Dashboard.
    2. Copy the project number (e.g. 729328892908).
    3. On the Navigation menu, select IAM & Admin > IAM.
    4. At the top of the roles table, below View by Principals, click Grant Access.
    5. For New principals, type:

    6. Replace {project-number} with your project number.
    7. For Role, select Project (or Basic) > Editor.
    8. Click Save.

### Jupyter notebook-based development environment setup
For this lab, you will be running all commands in a terminal from your notebook.

    1. In the Google Cloud Console, on the Navigation Menu, click Vertex AI > Workbench.
    2. Click Enable Notebooks API.
    3. On the Workbench page, select USER-MANAGED NOTEBOOKS and click CREATE NEW.
    4. In the New instance dialog box that appears, set the region to region and zone to zone.
    5. For Environment, select Apache Beam.
    6. Click CREATE at the bottom of the dialog vox.
        Note: The environment may take 3 - 5 minutes to be fully provisioned. Please wait until the step is complete.
        Note: Click Enable Notebook API to enable the notebook api.
    7. Once the environment is ready, click the OPEN JUPYTERLAB link next to your Notebook name. This will open up your environment in a new tab in your browser to run the notebook.

### Install Necessary Libraries
First, ensure you have the required Python libraries installed.
Create a virtual environment for your work in this lab:

In [None]:
!sudo apt-get update && sudo apt-get install -y python3-venv

In [None]:
!python3 -m venv df-env
!source df-env/bin/activate

In [None]:
!pip install --quiet --upgrade pip
!pip install --quiet apache-beam[gcp] google-cloud-pubsub google-cloud-storage

### Define GCP Configuration Variables
Set your Google Cloud Project ID, Region, and desired names for
the GCS Bucket and Pub/Sub topics.

In [None]:
GCP_PROJECT_ID = "" # @param {type:"string"} Insert your Project ID here
GCP_REGION = "us-central1" # @param {type:"string"} Specify the region for Dataflow and Pub/Sub
BUCKET_NAME_SUFFIX = "df-pipeline" # @param {type:"string"} Suffix for unique bucket name
INPUT_TOPIC_NAME = "cdc-input-messages" # @param {type:"string"} Name for the input Pub/Sub topic
OUTPUT_TOPIC_NAME = "cdc-output-messages" # @param {type:"string"} Name for the output Pub/Sub topic
DEAD_LETTER_TOPIC_NAME = "cdc-dlq-messages" # @param {type:"string"} Name for the dead-letter Pub/Sub topic
FIELDS_TO_REMOVE = "ssn" # @param {type:"string"} Comma-separated fields to remove (e.g., "ssn,email")

In [None]:
# --- Set derived variables ---
if not GCP_PROJECT_ID:
    print("Please set the GCP_PROJECT_ID variable above.")
    # Attempt to get project ID from gcloud config if running locally
    import subprocess
    try:
        GCP_PROJECT_ID = subprocess.check_output(['gcloud', 'config', 'get-value', 'project'], text=True).strip()
        print(f"Inferred GCP_PROJECT_ID: {GCP_PROJECT_ID}")
    except Exception as e:
        print(f"Could not automatically determine Project ID. Please set it manually. Error: {e}")
        raise ValueError("GCP_PROJECT_ID is not set.")

In [None]:
GCS_BUCKET = f"gs://{GCP_PROJECT_ID}-{BUCKET_NAME_SUFFIX}"
PIPELINE_FOLDER = f"{GCS_BUCKET}/df-pubsub_streaming_data_redaction_pipeline"
INPUT_TOPIC_PATH = f"projects/{GCP_PROJECT_ID}/topics/{INPUT_TOPIC_NAME}"
OUTPUT_TOPIC_PATH = f"projects/{GCP_PROJECT_ID}/topics/{OUTPUT_TOPIC_NAME}"
DEAD_LETTER_TOPIC_PATH = f"projects/{GCP_PROJECT_ID}/topics/{DEAD_LETTER_TOPIC_NAME}"
RUNNER = "DataflowRunner" # Use DataflowRunner to run on GCP

In [None]:
# Set environment variables for the pipeline script
import os
os.environ['PROJECT_ID'] = GCP_PROJECT_ID
os.environ['REGION'] = GCP_REGION
os.environ['BUCKET'] = GCS_BUCKET
os.environ['PIPELINE_FOLDER'] = PIPELINE_FOLDER
os.environ['RUNNER'] = RUNNER
os.environ['INPUT_TOPIC'] = INPUT_TOPIC_PATH
os.environ['OUTPUT_TOPIC'] = OUTPUT_TOPIC_PATH
os.environ['DEAD_LETTER_TOPIC'] = DEAD_LETTER_TOPIC_PATH
os.environ['FIELDS_TO_REMOVE'] = FIELDS_TO_REMOVE

In [None]:
print(f"Project ID:          {os.environ['PROJECT_ID']}")
print(f"Region:              {os.environ['REGION']}")
print(f"GCS Bucket:          {os.environ['BUCKET']}")
print(f"Pipeline Folder:     {os.environ['PIPELINE_FOLDER']}")
print(f"Input Topic:         {os.environ['INPUT_TOPIC']}")
print(f"Output Topic:        {os.environ['OUTPUT_TOPIC']}")
print(f"Dead Letter Topic:   {os.environ['DEAD_LETTER_TOPIC']}")
print(f"Fields to Remove:    {os.environ['FIELDS_TO_REMOVE']}")
print(f"Runner:              {os.environ['RUNNER']}")

### Enable Necessary GCP APIs
Ensure that the Dataflow, Pub/Sub, and Cloud Storage APIs are enabled for your project.

In [None]:
print("Enabling required Google Cloud APIs...")
!gcloud services enable dataflow.googleapis.com --project=$GCP_PROJECT_ID
!gcloud services enable pubsub.googleapis.com --project=$GCP_PROJECT_ID
!gcloud services enable storage-component.googleapis.com --project=$GCP_PROJECT_ID
!gcloud services enable cloudresourcemanager.googleapis.com --project=$GCP_PROJECT_ID # Often needed by Dataflow
print("APIs enabled (or already were).")

### Create Google Cloud Storage Bucket
Dataflow requires a GCS bucket for staging code and storing temporary files.
We'll create one if it doesn't exist.

In [None]:
print(f"Attempting to create GCS Bucket: {GCS_BUCKET}")
!gsutil mb -p $GCP_PROJECT_ID -l $GCP_REGION {GCS_BUCKET} || true

### Create Pub/Sub Topics
Create the input, output, and dead-letter topics required by the pipeline.

In [None]:
print(f"Creating Pub/Sub topics in project {GCP_PROJECT_ID}...")

print(f"Creating Input Topic: {INPUT_TOPIC_PATH}")
!gcloud pubsub topics create {INPUT_TOPIC_NAME} --project=$GCP_PROJECT_ID || echo "Input topic already exists or failed."

print(f"Creating Output Topic: {OUTPUT_TOPIC_PATH}")
!gcloud pubsub topics create {OUTPUT_TOPIC_NAME} --project=$GCP_PROJECT_ID || echo "Output topic already exists or failed."

print(f"Creating Dead Letter Topic: {DEAD_LETTER_TOPIC_PATH}")
!gcloud pubsub topics create {DEAD_LETTER_TOPIC_NAME} --project=$GCP_PROJECT_ID || echo "Dead Letter topic already exists or failed."

print("Pub/Sub topic creation process finished.")

## Define the Dataflow Pipeline Code
---
Save the provided Python script for the Dataflow pipeline to a local file
within this notebook's environment using the `%%writefile` magic command.

In [None]:
%%writefile pubsub_streaming_data_redaction_pipeline.py
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import argparse
import time
import logging
import json
import typing
from datetime import datetime, timezone
import apache_beam as beam
from apache_beam.pvalue import TaggedOutput
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.runners import DataflowRunner, DirectRunner
from apache_beam.io.gcp.pubsub import ReadFromPubSub, WriteToPubSub

# Helper Functions and Classes

class ParseJsonWithDLQ(beam.DoFn):
    """
    Parses a JSON byte string into a Python dictionary.
    Outputs successfully parsed messages to the main output tag.
    Outputs messages that fail parsing to the 'dead_letter_data' tag.
    """
    SUCCESS_TAG = 'parsed_data'
    FAILURE_TAG = 'dead_letter_data'

    def process(self, element: bytes):
        try:
            parsed_dict = json.loads(element)
            yield TaggedOutput(self.SUCCESS_TAG, parsed_dict)
        except json.JSONDecodeError as e:
            logging.error(f"Failed to decode JSON: {e}. Message: {element[:100]}...", exc_info=False)
            yield TaggedOutput(self.FAILURE_TAG, element)
        except Exception as e:
            logging.error(f"An unexpected error occurred during parsing: {e}. Message: {element[:100]}...", exc_info=True)
            yield TaggedOutput(self.FAILURE_TAG, element)

def add_processing_timestamp(element: dict) -> dict:
    """Adds processing timestamps to a dictionary."""
    now_utc = datetime.now(timezone.utc)
    element['processingTimestamp'] = now_utc.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    return element

def encode_to_json_bytes(element: dict) -> bytes:
    """Encodes a dictionary to a JSON formatted byte string."""
    return json.dumps(element).encode('utf-8')

class RemoveFieldsFn(beam.DoFn):
    """Removes specified fields from a dictionary element."""
    def __init__(self, fields_to_remove_set):
        if isinstance(fields_to_remove_set, str):
             self.fields_to_remove = set(f.strip() for f in fields_to_remove_set.split(',') if f.strip())
        elif isinstance(fields_to_remove_set, (list, set)):
             self.fields_to_remove = set(fields_to_remove_set)
        else:
             self.fields_to_remove = set()
        logging.info(f"RemoveFieldsFn initialized. Fields to remove: {self.fields_to_remove}")

    def process(self, element: dict):
        if not self.fields_to_remove:
            yield element
            return
        element_copy = element.copy()
        for field in self.fields_to_remove:
            element_copy.pop(field, None) # Use pop with default None to avoid KeyError if field doesn't exist
        yield element_copy

# Dataflow pipeline run method
def run():
    # Command line arguments
    parser = argparse.ArgumentParser(description='Load from Json from Pub/Sub, process events, write successes and failures to Pub/Sub')
    parser.add_argument('--project',required=True, help='Specify Google Cloud project')
    parser.add_argument('--region', required=True, help='Specify Google Cloud region')
    parser.add_argument('--staging_location', required=True, help='Specify Cloud Storage bucket for staging')
    parser.add_argument('--temp_location', required=True, help='Specify Cloud Storage bucket for temp')
    parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner (e.g., DataflowRunner, DirectRunner)')
    parser.add_argument('--input_topic', required=True, help='Input Pub/Sub Topic (full path: projects/<PROJECT>/topics/<TOPIC>)')
    parser.add_argument('--output_topic', required=True, help='Main output Pub/Sub Topic for successfully processed events (full path: projects/<PROJECT>/topics/<TOPIC>)')
    parser.add_argument('--dead_letter_topic', required=True, help='Output Pub/Sub Topic for messages that failed processing (full path: projects/<PROJECT>/topics/<TOPIC>)')
    parser.add_argument('--fields_to_remove', type=str, default=None, help='Comma-separated list of fields to remove from the main output (e.g., "user_id,ip")')
    parser.add_argument('--num_workers', type=int, default=None, help='Initial number of workers for the job')
    parser.add_argument('--max_num_workers', type=int, default=None, help='Max number of workers for the job')

    opts = parser.parse_args()

    # Setting up the Beam pipeline options
    options = PipelineOptions(save_main_session=True, streaming=True) # Make sure streaming is True
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = opts.project
    google_cloud_options.region = opts.region
    google_cloud_options.staging_location = opts.staging_location
    google_cloud_options.temp_location = opts.temp_location
    google_cloud_options.job_name = '{0}{1}'.format('df-pubsub-streaming-redaction-pipeline-',time.time_ns())
    options.view_as(StandardOptions).runner = opts.runner
    options.view_as(WorkerOptions).num_workers = opts.num_workers
    options.view_as(WorkerOptions).max_num_workers = opts.max_num_workers

    input_topic = opts.input_topic
    output_topic = opts.output_topic
    dead_letter_topic = opts.dead_letter_topic

    # Process fields_to_remove Argument
    fields_to_remove_set = set()
    if opts.fields_to_remove:
        fields_to_remove_set = set(f.strip() for f in opts.fields_to_remove.split(',') if f.strip())

    # Create the pipeline
    p = beam.Pipeline(options=options)

    # Read from Pub/Sub
    raw_msgs = (p | 'ReadFromPubSub' >> ReadFromPubSub(topic=input_topic).with_output_types(bytes))

    # Parse JSON with Dead Letter Queue logic
    parse_results = (raw_msgs | 'ParseJsonWithDLQ' >> beam.ParDo(ParseJsonWithDLQ()).with_outputs(ParseJsonWithDLQ.SUCCESS_TAG, ParseJsonWithDLQ.FAILURE_TAG))

    # Handle Dead Letter Output
    dead_letter_msgs = parse_results[ParseJsonWithDLQ.FAILURE_TAG]
    (dead_letter_msgs | 'WriteDeadLetterToPubSub' >> WriteToPubSub(topic=dead_letter_topic))

    # Handle Successful Output
    parsed_msgs = parse_results[ParseJsonWithDLQ.SUCCESS_TAG]

    # Continue processing successfully parsed messages
    processed_events = (parsed_msgs | "AddProcessingTimestamp" >> beam.Map(add_processing_timestamp))

    # Conditionally add the RemoveFieldsFn step
    if fields_to_remove_set:
        logging.info(f"Adding step to remove fields: {fields_to_remove_set}")
        processed_events = (processed_events | 'RemoveSpecifiedFields' >> beam.ParDo(RemoveFieldsFn(fields_to_remove_set)))
    else:
         logging.info("No fields specified for removal.")

    # Encode dictionary to JSON bytes and write successfully processed events to Pub/Sub
    (processed_events # Use the final processed PCollection
        | "EncodeSuccessToJson" >> beam.Map(encode_to_json_bytes) # Changed label for clarity
        | 'WriteSuccessToPubSub' >> WriteToPubSub(topic=output_topic)
     )
    # End Handle Successful Output ---

    logging.getLogger().setLevel(logging.INFO)
    logging.info("Building pipeline...")
    # Using current time: Monday, April 7, 2025 at 3:26:02 PM EDT (Example Time)
    logging.info(f"Current time: {datetime.now().isoformat()}") # Log actual current time

    # Run the pipeline
    # For streaming pipelines, this initiates the job. Use wait_until_finish()
    # only if you are running a batch job or want the script to block until
    # the streaming job fails or is manually cancelled. Typically for streaming,
    # you launch it and monitor elsewhere (like the GCP console).
    pipeline_result = p.run()
    # For a long-running streaming job, you might not want to wait here.
    # pipeline_result.wait_until_finish() # Comment out for typical streaming deployment

    logging.info(f"Pipeline job started. Check the Dataflow UI: https://console.cloud.google.com/dataflow?project={opts.project}")


if __name__ == '__main__':
  run()

## Run the Dataflow Pipeline
---
Now, execute the Python script using the environment variables we set earlier.
This command submits the pipeline job to the Dataflow service.
**Note:** This starts a *streaming* Dataflow job, which will run continuously
until you manually stop or cancel it from the GCP Console or via gcloud command.

In [None]:
print("Submitting the Dataflow job...")
!python3 pubsub_streaming_data_redaction_pipeline.py \
 --project=${PROJECT_ID} \
 --region=${REGION} \
 --staging_location=${PIPELINE_FOLDER}/staging \
 --temp_location=${PIPELINE_FOLDER}/temp \
 --runner=${RUNNER} \
 --input_topic=${INPUT_TOPIC} \
 --output_topic=${OUTPUT_TOPIC} \
 --dead_letter_topic=${DEAD_LETTER_TOPIC} \
 --fields_to_remove="${FIELDS_TO_REMOVE}" \
 --num_workers=1 \
 --max_num_workers=3

print("\nPipeline submission command executed.")
print(f"Check the Dataflow UI for job status: https://console.cloud.google.com/dataflow?project={GCP_PROJECT_ID}")
print("It might take a few minutes for the job to start running.")

# Wait a bit for messages to potentially arrive and be processed
import time
print("\nWaiting 5 mins for the job to start...")
time.sleep(300)

## Test the Pipeline
---
### Create subscriptions to test the pipeline results
Create subscriptions to the topics to check results

In [None]:
# Create temporary subscription names
OUTPUT_SUB_NAME = f"{OUTPUT_TOPIC_NAME}-sub-temp"
DLQ_SUB_NAME = f"{DEAD_LETTER_TOPIC_NAME}-sub-temp"
OUTPUT_SUB_PATH = f"projects/{GCP_PROJECT_ID}/subscriptions/{OUTPUT_SUB_NAME}"
DLQ_SUB_PATH = f"projects/{GCP_PROJECT_ID}/subscriptions/{DLQ_SUB_NAME}"

# Create subscriptions (use || true to ignore errors if they already exist)
print("\nCreating temporary subscriptions...")
!gcloud pubsub subscriptions create {OUTPUT_SUB_NAME} --topic={OUTPUT_TOPIC_NAME} --project={GCP_PROJECT_ID} --ack-deadline=60 || true
!gcloud pubsub subscriptions create {DLQ_SUB_NAME} --topic={DEAD_LETTER_TOPIC_NAME} --project={GCP_PROJECT_ID} --ack-deadline=60 || true

### Publish a Sample Message
Publish a test message to the input Pub/Sub topic and check that it landed in the output topic with the transformations. 

In [None]:
# Define the sample message as a Python dictionary
sample_message_dict = {
    "commitTimestamp": "2025-04-07T20:07:10.123456Z",
    "transactionId": "txn-cust-insert-9f8e7d6c",
    "isLastRecordInTransactionInPartition": True,
    "tableName": "customers",
    "id": "cust-9f8e7d6c-1a2b-3c4d-5e6f-7g8h9i0j",
    "name": "Charles Xavier",
    "address": "1407 Graymalkin Lane, North Salem, NY 10560",
    "email": "prof.x@example.org",
    "ssn": "755-46-5678", # This field should be removed based on FIELDS_TO_REMOVE
    "birth_date": "1982-11-01",
    "modType": "INSERT"
}

# Convert the dictionary to a JSON string for publishing
import json
sample_message_json = json.dumps(sample_message_dict)

print(f"Publishing sample message to: {INPUT_TOPIC_PATH}")
!gcloud pubsub topics publish {INPUT_TOPIC_NAME} --project={GCP_PROJECT_ID} --message='{sample_message_json}'

# Wait a bit for messages to potentially arrive and be processed
import time
print("\nWaiting 10 seconds for messages to be processed by the pipeline...")
time.sleep(10)

# Pull from the output subscription
print(f"\n --- Checking Output Topic ({OUTPUT_TOPIC_PATH}) ---")
print(f"Pulling messages from subscription: {OUTPUT_SUB_PATH}")
!gcloud pubsub subscriptions pull {OUTPUT_SUB_PATH} --auto-ack --limit=5

print(f"\nExpected output message should have the 'ssn' field removed and a 'processingTimestamp' added.")

### Publish an Malformed Message (optional)
Publish an invalid message to the input Pub/Sub topic and check that it landed in the DLQ topic. 

In [None]:
malformed_message_json = "<THIS IS NOT A VALID JSON MESSAGE>"
print(f"Publishing malformed message to: {INPUT_TOPIC_PATH}")
!gcloud pubsub topics publish {INPUT_TOPIC_NAME} --project={GCP_PROJECT_ID} --message='{malformed_message_json}'

# Wait a bit for messages to potentially arrive and be processed
import time
print("\nWaiting 10 seconds for messages to be processed by the pipeline...")
time.sleep(10)

# Pull from the dead-letter subscription
print(f"\n --- Checking Dead-Letter Topic ({DEAD_LETTER_TOPIC_PATH}) ---")
print(f"Pulling messages from subscription: {DLQ_SUB_PATH}")
!gcloud pubsub subscriptions pull {DLQ_SUB_PATH} --auto-ack --limit=5

print(f"\nExpected dead-letter message should be the raw, unparseable string: '{malformed_message_json}'")

## Monitoring
---
You can monitor the running Dataflow job, view logs, and see metrics in the Google Cloud Console:
[https://console.cloud.google.com/dataflow](https://console.cloud.google.com/dataflow?project={GCP_PROJECT_ID})


## Cleanup (Important!)
---
**Streaming Dataflow jobs run continuously and incur costs until stopped.**
Follow these steps to clean up the resources created in this tutorial.

### Stop the Dataflow Job
Find your job ID in the Dataflow UI or using the `gcloud` command below.
Then, cancel the job.

In [None]:
print("Listing active Dataflow jobs in region", GCP_REGION, "(might take a moment)...")
!gcloud dataflow jobs list --region={GCP_REGION} --project={GCP_PROJECT_ID} --status=active

**ACTION REQUIRED:** Copy the `JOB_ID` of the 'df-pubsub-streaming-redaction-pipeline-...' job
from the output above and paste it below. Then run the cell to cancel the job.

In [None]:
DATAFLOW_JOB_ID = "" # @param {type:"string"} PASTE JOB ID HERE

if DATAFLOW_JOB_ID:
    print(f"\nAttempting to cancel Dataflow job {DATAFLOW_JOB_ID} in region {GCP_REGION}...")
    !gcloud dataflow jobs cancel {DATAFLOW_JOB_ID} --region={GCP_REGION} --project={GCP_PROJECT_ID}
else:
    print("\nPlease paste the Dataflow JOB_ID in the cell above and run it again to cancel the job.")
    print(f"Alternatively, cancel the job manually via the Cloud Console: https://console.cloud.google.com/dataflow?project={GCP_PROJECT_ID}")

### Delete Pub/Sub Subscriptions and Topics
Delete the temporary subscriptions and the topics created.

In [None]:
print("\nDeleting temporary Pub/Sub subscriptions...")
!gcloud pubsub subscriptions delete {OUTPUT_SUB_NAME} --project={GCP_PROJECT_ID} || echo "Failed to delete output subscription (might not exist)."
!gcloud pubsub subscriptions delete {DLQ_SUB_NAME} --project={GCP_PROJECT_ID} || echo "Failed to delete DLQ subscription (might not exist)."

print("\nDeleting Pub/Sub topics...")
!gcloud pubsub topics delete {INPUT_TOPIC_NAME} --project={GCP_PROJECT_ID} || echo "Failed to delete input topic."
!gcloud pubsub topics delete {OUTPUT_TOPIC_NAME} --project={GCP_PROJECT_ID} || echo "Failed to delete output topic."
!gcloud pubsub topics delete {DEAD_LETTER_TOPIC_NAME} --project={GCP_PROJECT_ID} || echo "Failed to delete DLQ topic."

### Delete the Cloud Storage Bucket
Delete the GCS bucket used for staging and temp files.
**Warning:** This will permanently delete all contents of the bucket.

In [None]:
print(f"\nDeleting GCS bucket: {GCS_BUCKET}")
print("This may take a moment if the bucket contains objects...")
# Use -r to remove recursively, -f to force deletion without confirmation prompts
!gsutil rm -r -f {GCS_BUCKET} || echo "Failed to delete GCS bucket (might already be deleted or permissions issue)."

print("\n--- Cleanup Complete ---")