In [94]:
from google.cloud import pubsub_v1
import json

## Testing Pub-Sub Client API

In [33]:
def publish_messages() -> None:
    """Publishes multiple messages to a Pub/Sub topic."""
    # [START pubsub_quickstart_publisher]
    # [START pubsub_publish]

    # TODO(developer)
    project_id = "jardim-data"
    topic_id = "tutoring-stream"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project_id, topic_id)

    for n in range(1, 3):
        data = {"Name": f"Juca {n}", "Age": n * 10}
        data = json.dumps(data)
        print(data)
        #data_str = f"Message number {n}"
        # Data must be a bytestring
        data = data.encode("utf-8")
        # When you publish a message, the client returns a future.
        future = publisher.publish(topic_path, data)
        print(future.result())

    print(f"Published messages to {topic_path}.")

In [34]:
publish_messages()

{"Name": "Juca 1", "Age": 10}
6185125262705491
{"Name": "Juca 2", "Age": 20}
6185023665084233
Published messages to projects/jardim-data/topics/tutoring-stream.


## Testing Publisher module
Testing the module usage in a dataframe loop

In [95]:
import os
os.chdir("/home/jardim/dev/tutoring-app-data-pipeline")
print(os.getcwd())

import modules.datagen as gen
import modules.bq_first_loader as bq
import modules.publisher as pub
import importlib
from datetime import datetime
importlib.reload(gen)
importlib.reload(bq)
importlib.reload(pub)

students = gen.StudentsGenerator(100)
subjects = gen.SubjectsGenerator()
tutors = gen.TutorsGenerator(100, subjects)
work_shifts = tutors.get_work_shifts()
sessions = gen.SessionsGenerator(students, subjects, tutors)
sessions.generate_sessions_for_date_range('2022-10-26', '2022-10-26')
sessions.process('50S')

/home/jardim/dev/tutoring-app-data-pipeline
100%

In [97]:
sessions_pub = pub.Publisher('stream-sessions')

Connecting to PubSub....  - Connected!


In [98]:
sess = sessions.get_sessions().loc[6:12, ]

start_at = datetime.now()
for i, s in sess.iterrows():
    sessions_pub.publish_df_row(s)
    
end_at = datetime.now()
end_at - start_at

datetime.timedelta(microseconds=6394)