In [1]:
from google.cloud import pubsub_v1
from google.cloud import storage
from concurrent.futures import TimeoutError
import pandas as pd
from time import sleep
from json import dumps
import json
import os
from dotenv import load_dotenv

In [2]:
load_dotenv()

project_id = os.getenv("project_id")
subscription_id = os.getenv("subscription_id")
bucket_name = os.getenv("bucket_name")

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)


In [3]:
file_counter = 0

In [None]:
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    global file_counter
    file_name = f"stock_market_{file_counter}.json"
    message_data = json.loads(message.data.decode('utf-8'))
    blob = bucket.blob(file_name)
    blob.upload_from_string(data=json.dumps(message_data), content_type="application/json")
    print(f"Received {message}.")
    message.ack()
    file_counter += 1

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

with subscriber:
    try:
        streaming_pull_future.result()
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Listening for messages on projects/stockmarketrealtimedataanalysi/subscriptions/stockMarketSubscription..

Received Message {
  data: b'{"Index": "N225", "Date": "1987-06-05", "Open": 25...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"Index": "NYA", "Date": "1978-08-28", "Open": 620...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"Index": "N100", "Date": "2016-02-09", "Open": 80...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"Index": "N225", "Date": "1997-04-03", "Open": 18...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"Index": "399001.SZ", "Date": "2010-08-03", "Open...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"Index": "399001.SZ", "Date": "2006-09-11", "Open...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"Index": "GSPTSE", "Date": "1997-12-19", "Open": ...'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'{"I