In [12]:
import os
import requests
from confluent_kafka import Producer
import time
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient


In [13]:

KEY_VAULT_NAME = "databkeys"

KY_URI =f'https://databkeys.vault.azure.net/'

In [14]:
credential = DefaultAzureCredential()


In [15]:
secret_client = SecretClient(vault_url=KY_URI, credential=credential)


In [16]:
azure_key_vault_vals = ['FINNHUB','Eventhub']

In [20]:
FINNHUB_API_KEY,CONNECTION_STRING = [secret_client.get_secret(val).value for val in azure_key_vault_vals]

In [21]:
print(FINNHUB_API_KEY)

ct9kfs1r01quh43o9jl0ct9kfs1r01quh43o9jlg


In [22]:
EVENTHUB_NAMESPACE = "eh-namespace-stock-data.servicebus.windows.net:9093"
EVENTHUB_TOPIC = "eh-stock-data"  # Replace with your Event Hub name
SYMBOL = "AAPL"  # Stock symbol to track
INTERVAL = 30

In [23]:
conf = {
    'bootstrap.servers': EVENTHUB_NAMESPACE,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': '$ConnectionString',
    'sasl.password': CONNECTION_STRING,
    'client.id': 'stock-data-producer'
}

In [24]:
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [partition {msg.partition()}]")

In [25]:
producer = Producer(conf)


In [26]:
producer.produce('eh-stock-data', value="This is sample message!", callback=delivery_report)
producer.flush(10)  # This ensures the message is actually sent and the callback is triggered


Message delivered to eh-stock-data [partition 0]


0

In [27]:
def fetch_stock_data(symbol):
    """Fetch current stock quote data from Finnhub."""
    url = f"https://finnhub.io/api/v1/quote?symbol={symbol}&token={FINNHUB_API_KEY}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to fetch data for {symbol}. Status: {response.status_code}, Response: {response.text}")
        return None

In [28]:
def main():
    print(f"Starting stock data streaming for {SYMBOL} every {INTERVAL} seconds.")
    print("Press Ctrl+C to stop.")

    try:
        while True:
            data = fetch_stock_data(SYMBOL)
            if data:
                # Convert the JSON data to string for sending
                message_value = str(data)
                print(f"Fetched data for {SYMBOL}: {message_value}")

                # Produce the message to the Event Hub (as a Kafka topic)
                producer.produce(EVENTHUB_TOPIC, value=message_value, callback=delivery_report)
                
                # Ensure all messages are sent before next iteration
                producer.flush(10)
            
            # Wait for the next iteration
            time.sleep(INTERVAL)

    except KeyboardInterrupt:
        print("Streaming interrupted by user. Exiting...")
    finally:
        # Flush any remaining messages
        producer.flush(10)
        print("Stopped streaming.")

In [29]:
if __name__ == "__main__":
    main()

Starting stock data streaming for AAPL every 30 seconds.
Press Ctrl+C to stop.
Fetched data for AAPL: {'c': 242.84, 'd': -0.2, 'dp': -0.0823, 'h': 244.63, 'l': 242.08, 'o': 242.905, 'pc': 243.04, 't': 1733605200}
Message delivered to eh-stock-data [partition 0]
Fetched data for AAPL: {'c': 242.84, 'd': -0.2, 'dp': -0.0823, 'h': 244.63, 'l': 242.08, 'o': 242.905, 'pc': 243.04, 't': 1733605200}
Message delivered to eh-stock-data [partition 0]
Streaming interrupted by user. Exiting...
Stopped streaming.
