# Consumer Service

In [0]:
import os

secrets_scope = "<your-azure-key-vault-scope>"

# Service Principal Credentials
AZURE_TENANT_ID = dbutils.secrets.get(scope=secrets_scope, key="AZURE-TENANT-ID")
AZURE_CLIENT_ID = dbutils.secrets.get(scope=secrets_scope, key="AZURE-CLIENT-ID")
AZURE_CLIENT_SECRET = dbutils.secrets.get(scope=secrets_scope, key="AZURE-CLIENT-SECRET")

# Azure OpenAI
AZURE_OPENAI_API_KEY = dbutils.secrets.get(scope=secrets_scope, key="AZURE-OPENAI-API-KEY")
AZURE_OPENAI_ENDPOINT = dbutils.secrets.get(scope=secrets_scope, key="AZURE-OPENAI-ENDPOINT")
AZURE_OPENAI_PTU_DEPLOYMENT_NAME = dbutils.secrets.get(scope=secrets_scope, key="AZURE-OPENAI-PTU-DEPLOYMENT-NAME")
AZURE_OPENAI_RESOURCE_ID = dbutils.secrets.get(scope=secrets_scope, key="AZURE-OPENAI-RESOURCE-ID")

# CosmosDB
COSMOSDB_CONTAINER = dbutils.secrets.get(scope=secrets_scope, key="COSMOSDB-CONTAINER")
COSMOSDB_DATABASE = dbutils.secrets.get(scope=secrets_scope, key="COSMOSDB-DATABASE")
COSMOSDB_ENDPOINT = dbutils.secrets.get(scope=secrets_scope, key="COSMOSDB-ENDPOINT")
COSMOSDB_KEY = dbutils.secrets.get(scope=secrets_scope, key="COSMOSDB-KEY")

# EventHub
EVENTHUB_CONNECTION_STR = dbutils.secrets.get(scope=secrets_scope, key="EVENTHUB-CONNECTION-STR")
EVENTHUB_NAME = dbutils.secrets.get(scope=secrets_scope, key="EVENTHUB-NAME")
STORAGE_ACCOUNT_CHECKPOINT_STORE = dbutils.secrets.get(scope=secrets_scope, key="STORAGE-ACCOUNT-CHECKPOINT-STORE")
STORAGE_ACCOUNT_CHECKPOINT_STORE_CONTAINER = dbutils.secrets.get(scope=secrets_scope, key="STORAGE-ACCOUNT-CHECKPOINT-STORE-CONTAINER")

# Define the cluster environment variables
os.environ["AZURE_TENANT_ID"] = AZURE_TENANT_ID
os.environ["AZURE_CLIENT_ID"] = AZURE_CLIENT_ID
os.environ["AZURE_CLIENT_SECRET"] = AZURE_CLIENT_SECRET
os.environ["AZURE_OPENAI_API_KEY"] = AZURE_OPENAI_API_KEY
os.environ["AZURE_OPENAI_ENDPOINT"] = AZURE_OPENAI_ENDPOINT
os.environ["AZURE_OPENAI_PTU_DEPLOYMENT_NAME"] = AZURE_OPENAI_PTU_DEPLOYMENT_NAME
os.environ["AZURE_OPENAI_RESOURCE_ID"] = AZURE_OPENAI_RESOURCE_ID
os.environ["COSMOSDB_CONTAINER"] = COSMOSDB_CONTAINER
os.environ["COSMOSDB_DATABASE"] = COSMOSDB_DATABASE
os.environ["COSMOSDB_ENDPOINT"] = COSMOSDB_ENDPOINT
os.environ["COSMOSDB_KEY"] = COSMOSDB_KEY
os.environ["EVENTHUB_CONNECTION_STR"] = EVENTHUB_CONNECTION_STR
os.environ["EVENTHUB_NAME"] = EVENTHUB_NAME
os.environ["STORAGE_ACCOUNT_CHECKPOINT_STORE"] = STORAGE_ACCOUNT_CHECKPOINT_STORE
os.environ["STORAGE_ACCOUNT_CHECKPOINT_STORE_CONTAINER"] = STORAGE_ACCOUNT_CHECKPOINT_STORE_CONTAINER


In [0]:
import logging
from app.src.consumer import Consumer
from app.config import settings
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
import asyncio
import nest_asyncio

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

checkpoint_store = BlobCheckpointStore.from_connection_string(
    os.environ["STORAGE_ACCOUNT_CHECKPOINT_STORE"],
    os.environ["STORAGE_ACCOUNT_CHECKPOINT_STORE_CONTAINER"]
)

async def main():
    consumer = Consumer(checkpoint_store=checkpoint_store)
    logger.info("Consumer initialized. Starting event consumption loop...")
    while True:
        processed_count = await consumer.consume_event()
        logger.info(f"Events processed in this cycle: {processed_count}")
        await asyncio.sleep(1)  # Adjust sleep as needed

if __name__ == "__main__":
    nest_asyncio.apply()
    asyncio.run(main())