In [None]:
# ! pip install google-cloud-pubsub pandas
# ! pip install avro
# https://www.youtube.com/watch?v=7wf1HhYQiDg&t=522s - Jupyter lab introduction

In [1]:
from google.cloud import bigquery
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import PullRequest
import pandas as pd
import avro.schema #import the avro schema
from avro.io import DatumReader, BinaryDecoder
from io import BytesIO
import time

projectId = 'grounded-gizmo-394701'
datasetId = 'test_ucarRT'
tableId = 'ucar_rt_1'
topicName = 'something'
subName = 'something-sub-2'

# Initialize a BigQuery client.
# removed client = bigquery.Client(project=projectId)
client = bigquery.Client()

#initailzie whatever pubsub ur reading from
# removed subscriber = pubsub_v1.SubscriberClient(project=projectId)
subscriber = pubsub_v1.SubscriberClient()

# Define your query. 
query = f"""
SELECT * FROM `{projectId}.{datasetId}.{tableId}`;
"""

In [81]:
#query result
results = client.query(query).to_dataframe().tail(9)
# results = client.query(query).to_dataframe().head(10)

In [82]:
# display(results)

In [83]:
schema_path = 'avro.avsc'
with open(schema_path, 'r') as f:
    schema = avro.schema.parse(f.read())

#DatumReader requires a schema to know the structure of the data it will be decoding.
reader = DatumReader(schema)

In [84]:
subscription_path = subscriber.subscription_path(projectId, "subscribe_vertex")
print(subscription_path)
NUM_MESSAGES = 9 

projects/grounded-gizmo-394701/subscriptions/subscribe_vertex


In [74]:
# V2
# This list will accumulate the messages
accumulated_data = []

In [62]:
# ## callback message V1

# def callback(message):
#     #bytes UI encapsulate the avro buffer data and return memory bytes 
#     bytes_reader = BytesIO(message.data)
#     # get the binary decoder to decode messages
#     decoder = BinaryDecoder(bytes_reader)
#     # the avroreader known as reader from DatumReader object, parses the decoded messages into a python dictionary format
#     message_data = reader.read(decoder)
#     accumulated_data.append(message_data)
#     print(f"Received message: {message_data}")
#     message.ack()

In [75]:
# callback message V2

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    #bytes UI encapsulate the avro buffer data and return memory bytes 
    bytes_reader = BytesIO(message.data)
    # get the binary decoder to decode messages
    decoder = BinaryDecoder(bytes_reader)
    # the avroreader known as reader from DatumReader object, parses the decoded messages into a python dictionary format
    message_data = reader.read(decoder)
    accumulated_data.append(message_data)
    print(f"Received message: {message_data}")
    message.ack()

In [76]:
# V2
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...")

Listening for messages on projects/grounded-gizmo-394701/subscriptions/subscribe_vertex...


In [77]:
# V2

with subscriber:
    try:
        streaming_pull_future.result()
         # streaming_pull_future.result(timeout = 30)

        print(f"Listening for messages on {subscription_path} Done...")
    # if try does not work
    except TimeoutError:
        print(f"We stopped the listening process due to the timeout ")
        streaming_pull_future.cancel()
    except KeyboardInterrupt:
        print(f"triggered by keyboard interrupt,pull function exited")
        streaming_pull_future.cancel()
    except Exception as e:
        print(f"Listening for messages on {subscription_path} threw an exception: {type(e).__name__} .\n")
        # print(f"Traceback:\n{traceback.format_exc()}")
    # except: 
    #     print(f"callback did not work on {subscription_path}...")
    #     streaming_pull_future.cancel()  
    #     streaming_pull_future.cancel()  
    #     streaming_pull_future.cancel()  
    #     streaming_pull_future.result()


Received message: {'key': 'SUST~166~4567892343~', 'customerId': 56789044, 'firstName': None, 'middleName': None, 'lastName': None, 'fullName': None, 'preferredLanguageCd': 'EN', 'createTs': '2023-08-24 00:00:00 UTC', 'creditValueCd': None, 'typeCd': None, 'subtypeCd': None, 'workPhoneNum': '324567543dfh', 'emailAddressTxt': 123, 'wirelessActiveServiceCnt': 2, 'wirelineActiveServiceCnt': 0, 'delinquencyInd': False, 'primaryAddressProvinceCd': None, 'wirelineOrderSubmitInd': False, 'wirelessServiceOnlyInd': False, 'wirelineServiceOnlyInd': False, 'optOutMarketingSmsInd': False, 'optOutMarketingSmsStartTs': None, 'optOutMarketingEmailInd': False, 'optOutMarketingEmailStartTs': None, 'vipInd': False, 'emergencyInd': False, 'daytimeContactNum': None, 'eveningContactNum': None, 'cellContactNum': None, 'otherCellContactNum': None, 'brandId': 2, 'teamMbrInd': False, 'authNameTxt': None, 'file_name': None, 'create_ts': '2023-08-24 00:00:00 UTC', 'create_user_id': 'etl_cust', 'last_updt_ts': '20

In [79]:
# V2

dft = pd.DataFrame(accumulated_data)

display(dft)
# print(accumulated_data)

Unnamed: 0,key,customerId,firstName,middleName,lastName,fullName,preferredLanguageCd,createTs,creditValueCd,typeCd,...,file_name,create_ts,create_user_id,last_updt_ts,last_updt_user_id,subscription_name,message_id,publish_time,attributes,data
0,SUST~166~4567892343~,56789044,,,,,EN,2023-08-24 00:00:00 UTC,,,...,,2023-08-24 00:00:00 UTC,etl_cust,2023-08-24 00:00:00 UTC,etl_cust,,,,,
1,SUST~166~4567892343~,56789043,,,,,EN,2023-08-24 00:00:00 UTC,,,...,,2023-08-24 00:00:00 UTC,etl_cust,2023-08-24 00:00:00 UTC,etl_cust,,,,,
2,SUST~166~4567892343~,56789046,,,,,EN,2023-08-24 00:00:00 UTC,,,...,,2023-08-24 00:00:00 UTC,etl_cust,2023-08-24 00:00:00 UTC,etl_cust,,,,,
3,SUST~166~4567892343~,56789047,,,,,EN,2023-08-24 00:00:00 UTC,,,...,,2023-08-24 00:00:00 UTC,etl_cust,2023-08-24 00:00:00 UTC,etl_cust,,,,,
4,SUST~166~4567892343~,56789049,,,,,EN,2023-08-24 00:00:00 UTC,,,...,,2023-08-24 00:00:00 UTC,etl_cust,2023-08-24 00:00:00 UTC,etl_cust,,,,,
5,SUST~166~4567892343~,56789050,,,,,EN,2023-08-24 00:00:00 UTC,,,...,,2023-08-24 00:00:00 UTC,etl_cust,2023-08-24 00:00:00 UTC,etl_cust,,,,,


In [109]:
# request = PullRequest(subscription=subscription_path, max_messages=9)
# response = subscriber.pull(request = request)

In [84]:
# messages = []
# count = 0
# for received_message in response.received_messages:
#     print(f"Received message: {received_message.message.data}")
#     count+=1
    
# print(count)