In [None]:
import json
import os
import time
from pathlib import Path

from google.cloud import pubsub_v1
from tqdm import tqdm

## create topic and subscription

In [None]:
project_id: str = os.environ["GCP_PROJECT_ID"]
topic_name: str = "new_topic"
sub_name: str = "new_sub"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
topic = publisher.create_topic(name=topic_path)
subscriber = pubsub_v1.SubscriberClient()
sub_path = subscriber.subscription_path(project_id, sub_name)
subscription = subscriber.create_subscription(name=sub_path, topic=topic_path)

## push to topic

In [None]:
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, "new_topic")

paths = (Path.home() / "Desktop" / "music_tutorials").iterdir()

for path in tqdm(paths):
    data_bytes = json.dumps(
        {
            "url": path.as_posix(),
        }
    ).encode("utf-8")
    future = publisher.publish(topic_path, data=data_bytes)

## pull from topic

In [None]:
subscriber = pubsub_v1.SubscriberClient()
sub_path = subscriber.subscription_path(project_id, "new_sub")

while True:
    response = subscriber.pull(request={"subscription": sub_path, "max_messages": 5})
    print(f"len(messages)={len(response.received_messages)}")
    if len(response.received_messages) == 0:
        print("No messages")
        break

    print(f"BATCH: {time.time()}")
    for msg in response.received_messages:
        print(json.loads(msg.message.data.decode()))

    ack_ids = [msg.ack_id for msg in response.received_messages]
    if ack_ids:
        subscriber.acknowledge(request={"subscription": sub_path, "ack_ids": ack_ids})
    time.sleep(2)