In [1]:
import blpapi as bp
from pprint import pprint,pformat
from datetime import datetime
import logging as pylogging
import threading
from typing import Dict, List, Any, Optional
import json
import os
import sys
import time


In [2]:
pylogging.basicConfig(
    level=pylogging.INFO,
    format='[%(asctime)s] - %(process)s {%(pathname)s:%(lineno)d} - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    handlers=[
        pylogging.FileHandler('logs/BloombergEventHandler.log',mode='w')
    ]
)
LOG = pylogging.getLogger(__name__)

In [3]:
class BloombergEventHandler:
    _instance: Optional['BloombergEventHandler'] = None
    


    def __new__(cls, config_path: str) -> 'BloombergEventHandler':
        if cls._instance is None:
            LOG.info("Creating new BloombergEventHandler instance")
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
            return cls._instance

    def __init__(self, config_path: str):
        if self._initialized:
            LOG.debug("BloombergEventHandler already initialized, skipping initialization")
            return
            
        LOG.info(f"Initializing BloombergEventHandler with config: {config_path}")

        self.lock = threading.Lock()
        
        self._session = None
        self._sessionOptions = None
        self._subscription_list = bp.SubscriptionList()

        try:
            # Load Bloomberg API config
            with open(config_path, 'r') as f:
                self._config = json.load(f)
            LOG.debug(f"Loaded Bloomberg configuration: {pformat(self._config)}")

            self._sessionOptions = bp.SessionOptions()
            for i, host in enumerate(self._config['hosts']):
                self._sessionOptions.setServerAddress(host['addr'], host['port'], i)
            
            if 'appname' not in self._config or not self._config['appname']:
                raise ValueError("ApplicationName is required in the configuration")
            
            authOpts = bp.AuthOptions.createWithApp(appName=self._config['appname'])
            self._sessionOptions.setSessionIdentityOptions(authOpts)

            if "tlsInfo" in self._config:
                tlsInfo = self._config["tlsInfo"]
                pk12Blob = None
                pk7Blob = None
                with open(tlsInfo['pk12path'], 'rb') as pk12File:
                    pk12Blob = pk12File.read()
                with open(tlsInfo['pk7path'], 'rb') as pk7File:
                    pk7Blob = pk7File.read()

                self._sessionOptions.setTlsOptions(bp.TlsOptions.createFromBlobs(pk12Blob, tlsInfo['password'], pk7Blob))

            self._session = bp.Session(self._sessionOptions, self.eventHandler)

            self._initialized = True
            LOG.info("SubscriptionHandler initialization completed")

        except Exception as e:
            LOG.error(f"Error initializing SubscriptionHandler: {str(e)}", exc_info=True)
            raise

    def eventHandler(self, event: bp.Event, session: bp.Session) -> None:
        try:
            event_type = event.eventType()
            
            for msg in event:
                LOG.info(f"Received message: {msg}")
        
        except Exception as e:
            LOG.error(f"Error in eventHandler: {str(e)}", exc_info=True)


    def start(self) -> None:
        LOG.info("Starting SubscriptionHandler session")
        self._session.startAsync()

    def stop(self) -> None:
        LOG.info("Stopping SubscriptionHandler")
        try:
            self._session.stopAsync()
        except Exception as e:
            LOG.error(f"Error stopping SubscriptionHandler: {str(e)}", exc_info=True)


In [None]:
bpipeHandler = BloombergEventHandler("config/bpipe_config.local.json")
bpipeHandler.start()

In [5]:
bpipeHandler.stop()

In [7]:
bpipeHandler.stop()