## **PubSub**

Description: This notebook will teach you the basics of working with PubSub in Google Cloud Platform (GCP):

- What is PubSub?
- Creating topics and subscriptions programmatically.
- Publishing and consuming messages.
- Advanced topics:
   - Dead Letter Queues (DLQs)
   - Schema setup for PubSub
   - Message ordering
   - Push vs. Pull subscriptions


EDEM. Master Big Data & Cloud 2025/2026<br>
Professor: Javi Briones

In [None]:
!pip install google-cloud-pubsub --quiet

In [None]:
# Import libraries
from google.cloud import pubsub_v1
from google.pubsub_v1.types import Encoding
import logging
import json

In [None]:
# Set logs
logging.basicConfig(level=logging.INFO)

### **What is PubSub?**

Google Cloud PubSub is a fully managed messaging service that allows you to:

- Enable **real-time event streaming** by decoupling publishers and subscribers.
- **Publishers** send messages to **topics**.
- **Subscribers** receive messages through **subscriptions**.

#### **Key Concepts**

- **Topic:** A named resource that serves as the entry point for messages.
- **Subscription:** A named resource representing the subscriberâ€™s interest in a topic.
- **Publisher:** The application that sends messages to the topic.
- **Subscriber:** The application that receives messages from a subscription.



In [None]:
# Set project_id variable
PROJECT_ID = input("Enter your GCP Project ID: ")

In [None]:
# Set up PubSub clients
publisher = pubsub_v1.PublisherClient()
def get_subscriber():
    return pubsub_v1.SubscriberClient()
schema_client = pubsub_v1.SchemaServiceClient()

In [None]:
"""
Exercise 01: Create a PubSub Topic using the Python Client Library.
"""

topic_id = 'edem-demo-topic'
topic_path = publisher.topic_path(PROJECT_ID, topic_id)

logging.info(f"Creating topic: {topic_id}")

try:
    publisher.create_topic(name=topic_path)
    logging.info(f"Topic created: {topic_path}")

except Exception as err:
    logging.error(f"Topic may already exist: {err}")

In [None]:
"""
Exercise 02: Create a PubSub Subscription using the Python Client Library.
"""

subscription_id = 'edem-demo-topic-sub'
subscription_path = get_subscriber().subscription_path(PROJECT_ID, subscription_id)

logging.info(f"Creating subscription: {subscription_id}")

try:
    get_subscriber().create_subscription(name=subscription_path, topic=topic_path)
    logging.info(f"Subscription created: {subscription_path}")

except Exception as err:
    logging.error(f"Subscription may already exist: {err}")

In [None]:
"""
Exercise 03: Publish messages to a PubSub Queue using the Python Client Library.
"""

def publish_messages(publisher, topic_path):

    for i in range(5):
        
        payload = json.dumps({'id':i})
        msg = payload.encode("utf-8")
        future = publisher.publish(topic_path, msg)

        logging.info(f"Published {msg} with message ID {future.result()}")

publish_messages(publisher, topic_path)

In [None]:
"""
Exercise 04.A: Pull messages from a PubSub Queue using the Python Client Library.
"""

def pull_messages(subscriber, subscription_path):

    with subscriber:

        response = subscriber.pull(
            request={"subscription": subscription_path, "max_messages": 10}
        )

        for msg in response.received_messages:

            logging.info(f"Received: {msg.message.data.decode('utf-8')}")

            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": [msg.ack_id]}
            )

pull_messages(get_subscriber(), subscription_path)

In [None]:
"""
Exercise 04.B: Acknowledge messages in PubSub using the Python Client Library.
"""


def _pull_messages(subscriber, subscription_path):

    with subscriber:

        response = subscriber.pull(
            request={"subscription": subscription_path, "max_messages": 10}
        )

        for msg in response.received_messages:

            message = json.loads(msg.message.data.decode('utf-8'))

            if message['id'] % 2 == 0:

                subscriber.acknowledge(
                    request={"subscription": subscription_path, "ack_ids": [msg.ack_id]}
                )

                logging.info(f"Received: {message}")

            else:
                
                logging.info(f"Skipped: {message}")

_pull_messages(get_subscriber(), subscription_path)

### **Advanced Pub/Sub Concepts**

- **Dead Letter Queues (DLQs):**
   - A special subscription that collects undeliverable messages.
   - Useful for handling processing failures.

- **Schema Setup:**
   - Enforcing a specific format for Pub/Sub messages.
   - Ensures data consistency across publishers and subscribers.

- **Message Ordering:**
   - Enabling message ordering guarantees that messages with the same key are delivered in order.
   - Useful for stateful processing or time-series data.

- **Push vs. Pull Subscriptions:**
   - **Push:** Pub/Sub sends messages directly to an HTTP endpoint.
   - **Pull:** Subscribers pull messages explicitly, allowing more control over processing.


In [None]:
"""
Exercise 05: Demonstrating Message Ordering in PubSub.
"""

# Create a topic with message ordering enabled
ordered_topic_id = 'edem-ord-demo-topic'
ordered_topic_path = publisher.topic_path(PROJECT_ID, ordered_topic_id)

try:
    publisher.create_topic(
        request={
            "name": ordered_topic_path,
            "labels": {"purpose": "ordering-demo"}
        }
    )
    logging.info(f"Ordered topic created: {ordered_topic_path}")

except Exception as err:
    logging.warning(f"Topic may already exist: {err}")

# Create a subscription for the ordered topic
ordered_subscription_id = 'edem-ord-demo-sub'
ordered_subscription_path = get_subscriber().subscription_path(PROJECT_ID, ordered_subscription_id)

try:
    get_subscriber().create_subscription(
        request={
            "name": ordered_subscription_path,
            "topic": ordered_topic_path,
            "enable_message_ordering": True}
    )
    logging.info(f"Subscription created: {ordered_subscription_path}")

except Exception as err:
    logging.warning(f"Subscription may already exist: {err}")

In [None]:
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
ordering_publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options
)

messages = [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]

for msg in messages:

    data = msg[0].encode("utf-8")
    ordering_key = msg[1]

    future = ordering_publisher.publish(ordered_topic_path, data, ordering_key=ordering_key)
    logging.info(f"Published {data} with message ID {future.result()}")

In [None]:
pull_messages(get_subscriber(), ordered_subscription_path)

In [None]:
pull_messages(get_subscriber(), subscription_path)

In [None]:
"""
Exercise 06: Publishing Messages with Attributes
"""

for i in range(5):
        
    payload = json.dumps({'id':i})
    msg = payload.encode("utf-8")
    event = 'test' if i == 4 else 'train'

    # Add two attributes, user and event, to the message
    future = publisher.publish(topic_path, msg, user='admin', event=event)

    logging.info(f"Published {msg} with message ID {future.result()}")


In [None]:
## Pull messages and print attributes
response = get_subscriber().pull(
    request={"subscription": subscription_path, "max_messages": 10}
)

for msg in response.received_messages:

    if msg.message.attributes['event'] == 'test':

        logging.info(f"Received: {msg.message.data.decode('utf-8')}")
    
    else:

        logging.info(f"Received: {msg.message.attributes['event']}")

In [None]:
"""
Exercise 07: Enforcing Schema Validation with Dead Letter Queue.
"""

from google.protobuf import field_mask_pb2

# Create the schema
schema_id = "edem-schema"
schema_definition = """
{
  "type": "record",
  "name": "edem_message",
  "fields": [
    {"name": "id", "type": "int"}
  ]
}
"""

try:
    schema = schema_client.create_schema(
        request={
            "parent": f"projects/{PROJECT_ID}",
            "schema_id": schema_id,
            "schema": {"type_": "AVRO", "definition": schema_definition},
        }
    )
    logging.info(f"Created schema: {schema.name}")

except Exception as err:
    logging.warning(f"Schema may already exist: {err}")

# Add the schema to a newly created topic

from google.api_core.exceptions import AlreadyExists, InvalidArgument
from google.cloud.pubsub import PublisherClient, SchemaServiceClient
from google.pubsub_v1.types import Encoding

schema_topic_id = 'edem-schema-demo-topic'
schema_topic_path = publisher.topic_path(PROJECT_ID, schema_topic_id)

logging.info(f"Creating topic: {schema_topic_id}")

try:
    publisher.create_topic(request={
        "name": schema_topic_path,
        "schema_settings": {
            "schema": schema_client.schema_path(PROJECT_ID, schema_id),
            "encoding": Encoding.JSON
        }
    })
    
    logging.info(f"Topic created: {schema_topic_path}")

except Exception as err:
    logging.error(f"Topic may already exist: {err}")

In [None]:
# Publish valid and invalid messages
def publish_messages_with_schema(publisher, topic_path):

    valid_payload = json.dumps({'id':1})
    invalid_payload = json.dumps({'name':'invalid'})

    valid_msg = valid_payload.encode("utf-8")
    invalid_msg = invalid_payload.encode("utf-8")

    # Publish a valid message
    future_valid = publisher.publish(topic_path, valid_msg)
    logging.info(f"Published valid message with ID {future_valid.result()}")

    # Publish an invalid message
    try:
        future_invalid = publisher.publish(topic_path, invalid_msg)
        logging.info(f"Published invalid message with ID {future_invalid.result()}")
        
    except Exception as err:
        logging.error(f"Failed to publish invalid message: {err}")

publish_messages_with_schema(publisher, schema_topic_path)

In [None]:
"""
Exercise 08: Dead Letter Queue.
"""

# Create DLQ topic and subscription
dlq_topic_id = 'edem-dlq-topic'
dlq_topic_path = publisher.topic_path(PROJECT_ID, dlq_topic_id)

dlq_subscription_id = 'edem-dlq-sub'
dlq_subscription_path = get_subscriber().subscription_path(PROJECT_ID, dlq_subscription_id)

# Topic
try:
    publisher.create_topic(
        request={
            "name": dlq_topic_path,
            "labels": {"purpose": "ordering-demo"}
        }
    )
    logging.info(f"DLQ topic created: {dlq_topic_path}")

except Exception as err:
    logging.warning(f"Topic may already exist: {err}")

# Subscription
try:
    get_subscriber().create_subscription(name=dlq_subscription_path, topic=dlq_topic_path)
    logging.info(f"Subscription created: {dlq_subscription_path}")

except Exception as err:
    logging.error(f"Subscription may already exist: {err}")

In [None]:
# Update DLQ on Subscription
dead_letter_policy = pubsub_v1.types.DeadLetterPolicy(
    dead_letter_topic=dlq_topic_path,
    max_delivery_attempts=5,
)


subscription = pubsub_v1.types.Subscription(
    name=subscription_path,
    topic=topic_path,
    dead_letter_policy=dead_letter_policy,
)


# Fields to update
update_mask = pubsub_v1.types.FieldMask(paths=["dead_letter_policy"])

try:
    
    subscription = get_subscriber().update_subscription(
        request = {
        "subscription": subscription,
        "update_mask": update_mask
    }
    )

    logging.info(f"DLQ updated on main subscription: {subscription_path}")

except Exception as err:
    logging.warning(f"Subscription may already exist: {err}")

In [None]:
# B. Pull messages from the DLQ

# Define Callback
def callback(message): 
      
    data = json.loads(message.data.decode('utf-8'))
    logging.info(f"Message received: {data}") 

    if data.get("id") == 3:
        logging.error(f"Message processing failed")
    else:
        message.ack()

subscriber = get_subscriber()
future = subscriber.subscribe(subscription_path, callback)


### **Clean Up**

In [None]:
topics = []
subscriptions = []

topics.append(topic_path)
topics.append(ordered_topic_path)
topics.append(schema_topic_path)
topics.append(dlq_topic_path)

subscriptions.append(subscription_path)
subscriptions.append(ordered_subscription_path)
subscriptions.append(dlq_subscription_path)

In [None]:
# Delete Topics
for topic in topics:

    try:
        publisher.delete_topic(request={"topic": topic})
        logging.info(f"Topic deleted: {topic}")

    except Exception as err:
        logging.info(f"Error while deleting the topic: {err}")

In [None]:
# Delete Subscriptions
for subscription in subscriptions:

    try:
        get_subscriber().delete_subscription(request={"subscription": subscription})
        logging.info(f"Subscription deleted: {subscription}")

    except Exception as err:
        logging.info(f"Error while deleting the subscription: {err}")