## Reading Bloomberg Data
We'll be reading market data and company information from bloomberg data license. For that purpose, we created a wrapper atop of the data license API (see `bloomberg` package)

In [0]:
%pip install -r requirements.txt
dbutils.library.restartPython()

### Read portfolio

In [0]:
lakehouse_catalog = 'financial_services'
lakehouse_database = 'investment_analytics'
bloomberg_entity_table = 'bloomberg_entity'
bloomberg_history_table = 'bloomberg_prices'

In [0]:
import json
with open('portfolio.json', 'r') as f:
    portfolio = json.loads(f.read())

with open('bloomberg_credentials.json', 'r') as f:
    credentials = json.loads(f.read())

catalog = '40372'

In [0]:
def build_symbol(symbol):
    return {
        "@type": "Identifier",
        "identifierValue": symbol,
        "identifierType": "TICKER"
    }

symbols = [build_symbol(rec['ticker']) for rec in portfolio]

In [0]:
import os
output_dir = f'/Volumes/{lakehouse_catalog}/{lakehouse_database}/landing/bloomberg'
if not os.path.exists(output_dir):
  os.mkdir(output_dir)

### Download data from Bloomberg Data License

`TODO`: Make sure to whitelist serverless IP range on Bloomberg `DATA<GO>` portal

In [0]:
import uuid
from dateutil.relativedelta import relativedelta
from bloomberg.service import DataLicenseSvc, generate_unique_identifier

service = DataLicenseSvc(credentials)

#### Entity Request

In [0]:
request_name = "EntityRequest" + str(uuid.uuid1())[:6]
request_payload = {
    '@type': 'EntityRequest',
    'identifier': generate_unique_identifier(),
    'name': request_name,
    'description': 'Some description',
    'universe': {
        '@type': 'Universe',
        'contains': symbols
    },
    'fieldList': {
        '@type': 'EntityFieldList',
        'contains': [
            {'mnemonic': 'ID_BB_COMPANY'},
            {'mnemonic': 'LEGAL_ENTITY_IDENTIFIER'},
            {'mnemonic': 'LONG_COMP_NAME'},
            {'mnemonic': 'CIE_DES'},
            {'mnemonic': 'CNTRY_OF_DOMICILE'},
            {'mnemonic': 'INDUSTRY_GROUP'},
            {'mnemonic': 'INDUSTRY_SUBGROUP'},
            {'mnemonic': 'BS_SH_OUT'}
        ],
    },
    'trigger': {
        "@type": "SubmitTrigger",
    },
    'formatting': {
        '@type': 'MediaType',
        'outputMediaType': 'application/json',
    },
}

entityRequestIdentifier = service.submit_data_request(catalog=catalog, json_request=request_payload)
entityRequestFile = service.await_response(identifier=entityRequestIdentifier)
service.download_file(bloomberg_file=entityRequestFile, output_dir=output_dir)

In [0]:
from pyspark.sql import functions as F

@F.udf('string')
def get_ticker(identifier):
  # US equity only in that example portfolio
  return '{}-US'.format(identifier.split(' ')[0])

In [0]:
df = spark.read.format('json').load(entityRequestFile.output_dir).withColumn('TICKER', get_ticker(F.col('IDENTIFIER'))).drop('IDENTIFIER')
display(df)

In [0]:
_ = sql(f"""
  CREATE TABLE IF NOT EXISTS {lakehouse_catalog}.{lakehouse_database}.{bloomberg_entity_table} (
    BS_SH_OUT DOUBLE COMMENT 'Represents the balance sheet output value, which is a key financial metric for assessing the company\\'s financial position.',
    CIE_DES STRING COMMENT 'Describes the name or designation of the company or entity involved in the transaction or record.',
    CNTRY_OF_DOMICILE STRING COMMENT 'Indicates the country where the entity is legally domiciled, which can affect regulatory and tax considerations.',
    DL_REQUEST_ID STRING COMMENT 'A unique identifier for the data loading request, useful for tracking and managing data ingestion processes.',
    DL_REQUEST_NAME STRING COMMENT 'The name associated with the data loading request, providing context for the specific data operation being performed.',
    DL_SNAPSHOT_START_TIME STRING COMMENT 'Marks the start time of the data snapshot, which is important for understanding the temporal context of the data.',
    DL_SNAPSHOT_TZ STRING COMMENT 'Specifies the time zone of the snapshot start time, ensuring accurate interpretation of the timing of the data.',
    ID_BB_COMPANY BIGINT COMMENT 'A unique identifier for the company within the system, essential for linking related data across different tables.',
    INDUSTRY_GROUP STRING COMMENT 'Categorizes the company into a broader industry group, aiding in market analysis and benchmarking.',
    INDUSTRY_SUBGROUP STRING COMMENT 'Further classifies the company into a specific subgroup within the industry, providing more granular insights.',
    LEGAL_ENTITY_IDENTIFIER STRING COMMENT 'A standardized identifier for legal entities, which is crucial for compliance and regulatory reporting.',
    LONG_COMP_NAME STRING COMMENT 'The full legal name of the company, which is important for formal documentation and identification.',
    RC BIGINT COMMENT 'Represents a reference code or identifier that may be used for internal tracking or categorization purposes.',
    TICKER STRING COMMENT 'A unique identifier for the record or entity (aka Ticker), facilitating easy reference and retrieval of specific data points.'
  )
  USING delta
  COMMENT 'The table contains data related to companies and their financial metrics. It includes information such as the company\\'s identifier, legal entity identifier, industry classification, and various financial snapshots. Possible use cases include analyzing company performance, comparing industry trends, and tracking changes over time in financial metrics.'
  """)

In [0]:
df.write.mode('overwrite').saveAsTable(f'{lakehouse_catalog}.{lakehouse_database}.{bloomberg_entity_table}')

#### History Request

In [0]:
request_name = "HistoryRequest" + str(uuid.uuid1())[:6]
import datetime

today = datetime.date.today()
first = today.replace(day=1)
last_month = first - relativedelta(months=1)
request_payload = {
    '@type': 'HistoryRequest',
    'identifier': generate_unique_identifier(),
    'name': request_name,
    'description': 'My favorite history request',
    'universe': {
        '@type': 'Universe',
        'contains': symbols,
    },
    'fieldList': {
        '@type': 'HistoryFieldList',
        'contains': [
            # Returns the ask price received from the current pricing source. The ask price will always come from the date and time in Time Date of Last Update (PR024, LAST_UPDATE)/Date Of Last Update (PR371, LAST_UPDATE_DT). If there is no ask at that time but there was a valid bid then NA will be returned. If there is no bid and no ask at that time then the value will match Last Price (PR005, PX_LAST).
            {'mnemonic': 'PX_ASK'},
            # Returns the bid price received from the current pricing source. The bid price will always come from the date and time in Time Date of Last Update (PR024, LAST_UPDATE)/Date Of Last Update (PR371, LAST_UPDATE_DT). If there is no bid at that time but there was a valid ask then NA will be returned. If there is no bid and no ask at that time then the value will match Last Price (PR005, PX_LAST).
            {'mnemonic': 'PX_BID'},
            # Total number of shares traded on a security on the current day. If the security has not traded, then it is the total number of shares from the last day the security traded. If an exchange sends official closing price without a volume, the return will be '0'. If no closing price data is sent by the exchange, the return will reflect the last data received from the exchange. The pricing source in use must be set up to show volume, otherwise the field will return a blank. Expressed in units.
            {'mnemonic': 'PX_VOLUME'},
            # Price at which the security first traded on the current day. If the market is closed, it is the first price of the last day the market was open.
            {'mnemonic': 'PX_OPEN'},
            # Highest price the security reached during the current trading day. If the market is closed then it is the highest price the security reached on the last day the market was open.
            {'mnemonic': 'PX_HIGH'},
            # Lowest price the security reached during the current trading day. If the market is closed then it is the lowest price the security reached on the last day the market was open.
            {'mnemonic': 'PX_LOW'},
            # Returns the last price provided by the exchange. For securities that trade Monday through Friday, this field will be populated only if such information has been provided by the exchange in the past 30 trading days. For initial public offerings (IPO), the day before the first actual trading day may return the IPO price. For all other securities, this field will be populated only if such information was provided by the exchange in the last 30 calendar days. This applies to common stocks, receipts, warrants, and real estate investment trusts (REITs).
            {'mnemonic': 'PX_LAST'},
        ],
    },
    'trigger': {
        '@type': 'SubmitTrigger'
    },
    'runtimeOptions': {
        '@type': 'HistoryRuntimeOptions',
        'dateRange': {
            '@type': 'IntervalDateRange',
            'startDate': last_month.strftime("%Y-%m-%d"),
            'endDate': first.strftime("%Y-%m-%d")
        },
    },
    'formatting': {
        '@type': 'MediaType',
        'outputMediaType': 'application/json',
    },
}

historyRequestIdentifier = service.submit_data_request(catalog=catalog, json_request=request_payload)
historyRequestFile = service.await_response(identifier=historyRequestIdentifier)
service.download_file(bloomberg_file=historyRequestFile, output_dir=output_dir)

In [0]:
df = spark.read.format('json').load(historyRequestFile.output_dir).withColumn('TICKER', get_ticker(F.col('IDENTIFIER'))).drop('IDENTIFIER')
display(df)

In [0]:
_ = sql(f"""
  CREATE TABLE IF NOT EXISTS {lakehouse_catalog}.{lakehouse_database}.{bloomberg_history_table} (
    DATE STRING COMMENT 'Represents the date associated with the data entry, providing context for the time period of the records.',
    DL_REQUEST_ID STRING COMMENT 'A unique identifier for each data loading request, useful for tracking and referencing specific requests.',
    DL_REQUEST_NAME STRING COMMENT 'The name assigned to the data loading request, which can help in identifying the purpose or source of the data.',
    DL_SNAPSHOT_START_TIME STRING COMMENT 'Indicates the start time of the data snapshot, essential for understanding the timeframe of the data collected.',
    DL_SNAPSHOT_TZ STRING COMMENT 'Specifies the time zone of the snapshot start time, ensuring accurate interpretation of the timing data.',
    PX_ASK DOUBLE COMMENT 'The asking price for the asset, representing the lowest price a seller is willing to accept.',
    PX_BID DOUBLE COMMENT 'The bidding price for the asset, indicating the highest price a buyer is willing to pay.',
    PX_HIGH DOUBLE COMMENT 'The highest price reached by the asset during the specified time period, useful for assessing market performance.',
    PX_LAST DOUBLE COMMENT 'The last recorded price of the asset, providing the most recent market value.',
    PX_LOW DOUBLE COMMENT 'The lowest price recorded for the asset during the specified time period, important for understanding market volatility.',
    PX_OPEN DOUBLE COMMENT 'The opening price of the asset at the beginning of the specified time period, serving as a reference point for price movement.',
    PX_VOLUME BIGINT COMMENT 'The total volume of the asset traded during the specified time period, indicating market activity and liquidity.',
    RC BIGINT COMMENT 'Represents a record count or a related metric, which can be useful for data validation and analysis.',
    TICKER STRING COMMENT 'A unique identifier for the record or entity (aka Ticker), facilitating easy reference and retrieval of specific data points.')
USING delta
COMMENT 'The table contains data related to financial market transactions, specifically focusing on price and volume metrics for various securities. It includes information such as the date of the transaction, request identifiers, and various price points (open, high, low, last, ask, and bid). This data can be used for analyzing market trends, evaluating security performance, and conducting price comparisons over time.'""")

In [0]:
df.write.mode('overwrite').saveAsTable(f'{lakehouse_catalog}.{lakehouse_database}.{bloomberg_history_table}')

## Create a Genie space
Simplistic example with really small subset of data, but great exercise to showcase genie being used as a tool in a multi agent model. Unfortunately, MCP servers are only available on playground for now and not accessible through agent notebook (hence databricks apps). Instead, we use the Genie API that we wrap behind langchain `@tool` decorator.