In [1]:
import os
import numpy as np
import json

from google.cloud import pubsub_v1
from typing import Optional, List
from google.api_core.exceptions import AlreadyExists



os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "../../erudite-flag-364413-428e8c5a8dbb.json"
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "../credentials/erudite-flag-364413-9dd30db2b4a5.json"

project_id = "erudite-flag-364413"
topic_id = 'RC.MODEL'

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options, client_options=client_options
)

topic_path = publisher.topic_path(project_id, topic_id)

try:
    topic = publisher.create_topic(name=topic_path)
    print(topic.name)
except AlreadyExists:
    print(topic_path)

projects/erudite-flag-364413/topics/RC.MODEL


In [2]:
data = {
    'a': {'rate': list(np.ones(1)), 'concentration': list(np.ones(1)), 'shape': (1,)}, 
    'b': {'rate': list(np.ones(1)), 'concentration': list(np.ones(1)), 'shape': (1,)} 
}

encode_data = json.dumps(data).encode('utf-8')
print(encode_data)

for i in range(2):
    data['a']['rate'][0] += i
    encode_data = json.dumps(data).encode('utf-8')
    future = publisher.publish(topic_path, encode_data, ordering_key='A')
    print(future.result())

b'{"a": {"rate": [1.0], "concentration": [1.0], "shape": [1]}, "b": {"rate": [1.0], "concentration": [1.0], "shape": [1]}}'
7718366083140693
7718333282190803


In [3]:
def sub(project_id: str, subscription_id: str, timeout: Optional[float] = None) -> List:
    """Receives messages from a Pub/Sub subscription."""
    # Initialize a Subscriber client
    subscriber_client = pubsub_v1.SubscriberClient()
    # Create a fully qualified identifier in the form of
    # `projects/{project_id}/subscriptions/{subscription_id}`
    subscription_path = subscriber_client.subscription_path(project_id, subscription_id)

    try:
        subscription = subscriber_client.create_subscription(
            request={"name": subscription_path, "topic": topic_path}
        )
        print(subscription.name)
    except AlreadyExists:
        print(subscription_path)

    data = []

    def callback(message: pubsub_v1.subscriber.message.Message) -> None:
        print(f"Received {message}.")
        # Acknowledge the message. Unack'ed messages will be redelivered.
        message.ack()
        data.append(json.loads(message.data.decode()))
        print(f"Acknowledged {message.message_id}.")

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path, callback=callback
    )
    print(f"Listening for messages on {subscription_path}..\n")

    try:
        # Calling result() on StreamingPullFuture keeps the main thread from
        # exiting while messages get processed in the callbacks.
        streaming_pull_future.result(timeout=timeout)
    except:  # noqa
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

    subscriber_client.close()

    return data

In [20]:
peer_id = '.01'
subscription_id = topic_id + peer_id

print(subscription_id)

sub(project_id, subscription_id, 10)

RC.MODEL.01
projects/erudite-flag-364413/subscriptions/RC.MODEL.01
Listening for messages on projects/erudite-flag-364413/subscriptions/RC.MODEL.01..

Received Message {
  data: b'{"u_som": {"eta1": [[-0.4835032820701599], []], "e...'
  ordering_key: 'A'
  attributes: {}
}.
Acknowledged 7718588231186008.


[{'u_som': {'eta1': [[-0.4835032820701599], []],
   'eta2': [[-1.0436919927597046], []]},
  'u_som_t0': {'eta1': [[1.3746299743652344], []],
   'eta2': [[-9.862667083740234], []]},
  'u_yield': {'eta1': [[-0.16051578521728516], []],
   'eta2': [[-25.524232864379883], []]}}]

In [21]:
peer_id = '.02'
subscription_id = topic_id + peer_id

print(subscription_id)

sub(project_id, subscription_id, 10)

RC.MODEL.02
projects/erudite-flag-364413/subscriptions/RC.MODEL.02
Listening for messages on projects/erudite-flag-364413/subscriptions/RC.MODEL.02..

Received Message {
  data: b'{"u_som": {"eta1": [[-0.49462878704071045], []], "...'
  ordering_key: 'A'
  attributes: {}
}.
Acknowledged 7718587642029581.
Received Message {
  data: b'{"u_som": {"eta1": [[-0.4835032820701599], []], "e...'
  ordering_key: 'A'
  attributes: {}
}.
Acknowledged 7718588231186008.


[{'u_som': {'eta1': [[-0.49462878704071045], []],
   'eta2': [[-1.0279991626739502], []]},
  'u_som_t0': {'eta1': [[0.49980831146240234], []],
   'eta2': [[-5.6728081703186035], []]},
  'u_yield': {'eta1': [[-0.6400856375694275], []],
   'eta2': [[-19.310436248779297], []]}},
 {'u_som': {'eta1': [[-0.4835032820701599], []],
   'eta2': [[-1.0436919927597046], []]},
  'u_som_t0': {'eta1': [[1.3746299743652344], []],
   'eta2': [[-9.862667083740234], []]},
  'u_yield': {'eta1': [[-0.16051578521728516], []],
   'eta2': [[-25.524232864379883], []]}}]