In [1]:
import datetime as dt
import json
import pandas as pd # type: ignore
import numpy as np # type: ignore
import re
import os
import time
import sys
import logging
from google.cloud import bigquery
import warnings

sys.path.append(os.environ["HOME"]+"/trading/python/")
from lib import aplo # type: ignore
from lib import talos_utils # type: ignore
from lib import vault_utils # type: ignore
from lib import gsu # type: ignore
from lib import utils # type: ignore
from lib import keys_utils as keys
# from perps_monitor import main as perps_monitor_main # type: ignore

sys.path.append(os.environ["HOME"] + "/anchorage/source/python/lib/quant_lib/anchoragequantlib")
#import google_sheet_utility as aql_google_sheet_utility # type: ignore
#import utils as aql_utils # type: ignore


#logging.getLogger().setLevel(logging.INFO)
pd.set_option('display.max_rows', 50)
pd.set_option('display.max_columns', None)
pd.options.display.float_format = '{:,.8f}'.format
warnings.filterwarnings("ignore", category=UserWarning)

In [2]:
os.environ["GCP_PROJECT_VAR"] = ""
os.environ["GOOGLE_SHEET_KEY"] = "projects/698304263300/secrets/trading_gsheet_auth_token/versions/1"

# os.environ["OKX_APIKEY"] = keys.okx_bp_read_api_key
# os.environ["OKX_SECRET"] = keys.okx_bp_read_secret
# os.environ["OKX_PASSPHRASE"] = keys.okx_bp_read_passphrase

# os.environ["BYBIT_APIKEY"] = keys.bybit_read_api_key
# os.environ["BYBIT_SECRET"] = keys.bybit_read_api_secret


talos_wl_api = keys.talos_whitelabel_api_key()
talos_wl_secret_api = keys.talos_whitelabel_api_secret()
host_wl = keys.talos_whitelabel_host()

talos_api = keys.talos_principal_api_key()
talos_secret = keys.talos_principal_api_secret()
host = keys.talos_principal_host()

api_key_str = keys.ads_api_key()
signing_key_str = keys.ads_signing_key()

aplo_key = keys.aplo_read_key()

In [3]:
# Set this to True if you want to see the logs when running the code
debug = True
if debug:
    logging.basicConfig(
        format="%(levelname)s (%(asctime)s): %(message)s (Line:%(lineno)d in %(funcName)s, %(filename)s))",
        datefmt="%Y/%m/%d %I:%M:%S %p",
        level=logging.INFO,
    )

In [4]:
talos = talos_utils.Talos(talos_api, talos_secret, host)
talos_wl = talos_utils.Talos(talos_wl_api, talos_wl_secret_api, host_wl)
avu = vault_utils.AnchorageVaultUtility(
    api_key_str,
    signing_key_str
)
aplo_client = aplo.AploClient(aplo_key, debug=debug)

google_sheet_key = utils.read_secret(os.environ.get("GOOGLE_SHEET_KEY"))
gsheet_key = json.loads(google_sheet_key)
worksheet_name = "A1 Risk Dashboard"
trading_worksheet_name = "A1 - Trading"
gsu_risk_dash = gsu.GoogleSheetUtility(gsheet_key, worksheet_name)
gsu_trading = gsu.GoogleSheetUtility(gsheet_key, trading_worksheet_name)
name = re.search(r'/[^/]+/([^_]+)_', os.environ["HOME"]).group(1).capitalize()
n_of_days = 7

trade_sheet_names = {
    "counterparty": "Counterparty Trades",
    "dealer": "Dealer Trades",
    "exchange": "Exchange Trades",
    "perp": "Perps Trades",
}

market_translation = {
    "b2c2/b-2-c-2-a-1": {"type": "Dealer", "full_name": "B2C2 Overseas, Ltd."},
    "wintermute/wintermute-trading-rfq": {
        "type": "Dealer",
        "full_name": "Wintermute Trading LTD - API",
    },
    "okex/okx": {"type": "Exchange", "full_name": "OKX (Aux Cayes FinTech Co. Ltd.)"},
    "kraken/kraken-a-1": {
        "type": "Exchange",
        "full_name": "Kraken (Payward Trading Ltd.)",
    },
    "coinbase/coinbase-hrp": {"type": "Exchange", "full_name": "HRP Coinbase"},
    "bybit/bybit-a-1": {
        "type": "Exchange",
        "full_name": "Bybit (Bybit Fintech Limited)",
    },
    "gate_io/gate-io-a-1": {"type": "exchange", "full_name": "Gate.io"},
    "galaxy/galaxy-trading-rfq-a-1": {
        "type": "Dealer",
        "full_name": "Galaxy Trading Asia Limited",
    },
    "cumberland/cumberland-trading-rfq": {
        "type": "Dealer",
        "full_name": "Cumberland International Trading Ltd.",
    },
    "fireblocks": {"type": "Custodian", "full_name": "A1 Fireblocks"},
    "nonco/nonco-a-1": {
        "type": "Dealer",
        "full_name": "Nonco",
    },
    "ads": {"type": "Custodian", "full_name": "A1 ADS"},
    "aplo": {"type": "Aggregator", "full_name": "APLO"},
    "coinbase/coinbase-hold": {
        "type": "Hold",
        "full_name": "Coinbase Hold",
    },
    "cumberland/cumberland-_trading-&-rfq_": {
        "type": "Hold",
        "full_name": "Cumberland Hold",
    },
    "wintermute/wintermute-_trading-&-rfq_": {
        "type": "Hold",
        "full_name": "Wintermute Hold",
    },
    "janestreet/jane-street-_trading-&-rfq_": {
        "type": "Hold",
        "full_name": "Jane Street Hold",
    },
}

INFO (2025/03/12 03:57:27 PM): Debugging initialized for AploClient (Line:29 in _setup_debugging, aplo.py))


In [5]:
def get_fireblocks_balances():

    client = bigquery.Client()

    query = """
    WITH LatestEntries AS (
    SELECT 
        ID,
        MAX(created_at) AS latest_created_at
    FROM 
        `production-191601.brokerage.fireblocks_vault_balances`
    GROUP BY 
        ID
    )

    SELECT 
    a.ID,
    a.created_at,
    a.available
    FROM 
    `production-191601.brokerage.fireblocks_vault_balances` a
    JOIN 
    LatestEntries b ON a.ID = b.ID AND a.created_at = b.latest_created_at;
    """

    query_job = client.query(query)
    df = query_job.result().to_dataframe()

    return df

def get_aplo_balances():
    aplo_balances = pd.DataFrame(aplo_client.get_balances())
    aplo_balances = aplo_balances[["assetId", "quantity","notional"]]
    return aplo_balances

def get_ads_balances():    
    data = avu.query_all_vaults()
    df = pd.json_normalize(data, record_path=['assets'])
    df = df[["assetType", "totalBalance.quantity"]]   

    return df

def get_open_orders_wl():
    talos_counterparties_wl = talos_wl.get_orders(statuses="New,PartiallyFilled")

    talos_counterparties_wl = pd.DataFrame(talos_counterparties_wl)
    if talos_counterparties_wl.empty:
        return pd.DataFrame(columns=['OrderID', 'Symbol', 'Side', 'Strategy', 'OrderQty', 'Currency', 'Price', 'AvgPxAllIn', 'EndTime', 'CumQty', 'SubmitTime', 'StartTime', 'DecisionStatus', 'RequestSource'])
    df = talos_counterparties_wl .sort_values(by='EndTime', ascending=True).reset_index(drop=True)
    df["EndTime"] = pd.to_datetime(df["EndTime"]).dt.strftime("%Y-%m-%d %H:%M:%S") if "EndTime" in df.columns else ""
    df["StartTime"] = pd.to_datetime(df["StartTime"]).dt.strftime("%Y-%m-%d %H:%M:%S") if "StartTime" in df.columns else ""
    df["SubmitTime"] = pd.to_datetime(df["SubmitTime"]).dt.strftime("%Y-%m-%d %H:%M:%S") if "SubmitTime" in df.columns else ""
    df["Price"] = df["Price"] if "Price" in df.columns else ""
    df["AvgPxAllIn"] = df["AvgPxAllIn"] if "AvgPxAllIn" in df.columns else ""

    df = df[['OrderID', 'Symbol', 'Side', 'Strategy', 'OrderQty', 'Currency', 'Price', 'AvgPxAllIn', 'EndTime', 'CumQty', 'SubmitTime', 'StartTime', 'DecisionStatus', 'RequestSource']]

    return df
def get_open_orders():
    talos_counterparties = talos.get_orders(statuses="New,PartiallyFilled")

    talos_counterparties = pd.DataFrame(talos_counterparties)
    if talos_counterparties.empty:
        return pd.DataFrame(columns=['OrderID', 'Symbol', 'Side', 'Strategy', 'OrderQty', 'Currency', 'Price', 'AvgPxAllIn', 'EndTime', 'CumQty', 'SubmitTime', 'StartTime', 'DecisionStatus', 'RequestSource'])
    df = talos_counterparties.sort_values(by='Timestamp', ascending=True).reset_index(drop=True)
    df["EndTime"] = pd.to_datetime(df["EndTime"]).dt.strftime("%Y-%m-%d %H:%M:%S") if "EndTime" in df.columns else ""
    df["SubmitTime"] = pd.to_datetime(df["SubmitTime"]).dt.strftime("%Y-%m-%d %H:%M:%S") if "SubmitTime" in df.columns else ""
    df["StartTime"] = pd.to_datetime(df["StartTime"]).dt.strftime("%Y-%m-%d %H:%M:%S") if "StartTime" in df.columns else ""
    df["Price"] = df["Price"] if "Price" in df.columns else ""    
    df["AvgPxAllIn"] = df["AvgPxAllIn"] if "AvgPxAllIn" in df.columns else ""

    return df

def get_completed_orders(start_date):
    talos_completed_wl = talos_wl.get_orders(start_date = start_date, statuses="DoneForDay")

    talos_completed_wl = pd.DataFrame(talos_completed_wl)
    df = talos_completed_wl.sort_values(by='Timestamp', ascending=False).reset_index(drop=True)
    df = df[df['SubAccount']=="A1-Intl"]
    df["EndTime"] = pd.to_datetime(df["EndTime"]).dt.strftime("%Y-%m-%d %H:%M:%S") if "EndTime" in df.columns else ""
    df["StartTime"] = pd.to_datetime(df["StartTime"]).dt.strftime("%Y-%m-%d %H:%M:%S")
    df = df[['OrderID', 'Symbol', 'Side', 'Strategy', 'OrderQty', 'Currency', 'Price', 'AvgPxAllIn', 'EndTime', 'CumQty', 'SubmitTime', 'StartTime', 'RequestSource']]
    return df

# def get_aplo_orders():
#     aplo_orders = pd.json_normalize(aplo_client.get_orders())
#     df = aplo_orders[['token', 'status', 'instrument', 'classification', 'subType', 'requestedVolume', 'assetId', 'requestedPrice', 'time', 'parameters.lifespan',]]
#     return df

def get_talos_balances():
    markets = "kraken,okex,coinbase,wintermute,cumberland,janestreet,bybit,b2c2,nonco,galaxy,gate_io"
    talos_balances_wl = talos_wl.get_balances(markets=markets)
    talos_balances_wl = pd.DataFrame(talos_balances_wl)
    columns_to_drop = ['Status','MarketAccountID','ReconStatus']
    talos_balances_wl = talos_balances_wl.drop(columns=columns_to_drop)
    talos_balances_wl['AvailableAmount'] = pd.to_numeric(talos_balances_wl['AvailableAmount'], errors='coerce')
    talos_balances_wl = talos_balances_wl[["Currency", "Market", "Account", "Amount", "AvailableAmount", "AvailableMargin", "OutstandingBuy", "OutstandingSell"]]

    return talos_balances_wl

def get_balances():
    return {
        'fireblocks': get_fireblocks_balances(),
        'aplo': get_aplo_balances(),
        'ads': get_ads_balances(),
        'talos': get_talos_balances()
    }

def process_talos_balances(df):
    df = df.groupby(["Account", "Currency"])[["AvailableAmount"]].sum().reset_index()
    df["Type"] = np.where(df["Account"].str.contains('wintermute|cumberland|janestreet|b2c2|galaxy|nonco', regex=True), "Dealer", "Exchange")
    return df

def process_aplo_balances(df):
    df["Account"] = "aplo"
    df["Type"] = "Aggregator"
    return df.rename(columns={"assetId":"Currency", "quantity":"AvailableAmount", "notional":"Notional"})

def process_ads_balances(df):
    df["Account"] = "ads"
    df["Type"] = "Custodian"
    return df.rename(columns={"assetType":"Currency", "totalBalance.quantity":"AvailableAmount"})

def process_fireblocks_balances(df):
    df["Account"] = "fireblocks"
    df["Type"] = "Custodian"
    df = df.rename(columns={"available":"AvailableAmount"})
    df["Currency"] = df["ID"].str.split("_").str[0]
    return df[["Currency", "AvailableAmount", "Type", "Account"]]

def aggregated_balances(talos_balances_wl, aplo_balances, ads_balances, fireblocks_balances):
    #balances = get_balances()
    balances = {
        'talos': talos_balances_wl,
        'aplo': aplo_balances,
        'ads': ads_balances,
        'fireblocks': fireblocks_balances,
    }
    
    processed_balances = {
        'talos': process_talos_balances(balances['talos']),
        'aplo': process_aplo_balances(balances['aplo']),
        'ads': process_ads_balances(balances['ads']),
        'fireblocks': process_fireblocks_balances(balances['fireblocks'])
    }

    balances_df = pd.concat(processed_balances.values())
    
    balances_df["Full Name"] = balances_df["Account"].apply(lambda x: market_translation.get(x, {}).get("full_name", x))
    balances_df["Type"] = balances_df["Account"].apply(lambda x: market_translation.get(x, {}).get("type", x))
    balances_df = balances_df[["Full Name", "Currency","AvailableAmount", "Type"]]

    return balances_df

def get_trades_sheets(sheet_names):

    trades_sheets = {key: gsu_trading._get_current_sheet_df(name, 0) for key, name in sheet_names.items()}

    for key in trades_sheets:
        trades_sheets[key] = trades_sheets[key][trades_sheets[key]['Side'] != ""]

    return trades_sheets

def orders_to_book(trades_sheets, aplo_trades, talos_trades_wl):

    # NOT WORKING FOR NOW
    booked_orders = pd.concat([sheet.iloc[:,1] for sheet in trades_sheets.values()], axis=0).to_frame()
    booked_orders = booked_orders.loc[~(booked_orders == "").any(axis=1)]
    aplo_completed_orders = aplo_trades.query("status == 'open'")['token'].copy()
    talos_completed_orders = talos_trades_wl['OrderID'].copy()

    all_orders = pd.concat([booked_orders, aplo_completed_orders, talos_completed_orders], axis=0)

    #all_orders = all_orders.drop_duplicates(keep=False)
    return all_orders 
    



In [6]:
max_retries = 1
retries = 0

while retries < max_retries:
    try:

        utc_now = dt.datetime.now(dt.timezone.utc)
        utc_now_formatted = utc_now.strftime("%Y-%m-%d %H:%M:%S")
        days_ago = utc_now - dt.timedelta(days=n_of_days)
        start_date = days_ago.strftime("%Y-%m-%dT%H:%M:%S.000000Z")

        # logging.info("Fetching Perps Data")
        # try:
        #     perps_monitor_main.run(debug=debug)
        # except Exception as e:
        #     logging.error(f"Error: {e}")
        #     logging.error("Error fetching Perps data")
        #     pass

        logging.info("Fetchng Talos Data")
        talos_balances_wl = get_talos_balances()

        talos_tab_name = "talosData"
        gsu_risk_dash.dump_current_sheet(talos_tab_name, talos_balances_wl)

        logging.info("Fetching Fireblocks Balances in BigQuery")
        fb_balance_tab_name = "fireblocks_balances"
        fireblocks_balances = get_fireblocks_balances()
        gsu_risk_dash.dump_current_sheet(fb_balance_tab_name, fireblocks_balances)

        logging.info("Fetching Aplo Balances")
        aplo_balance_tab_name = "aplo_balances"
        aplo_balances = get_aplo_balances()
        gsu_risk_dash.dump_current_sheet(aplo_balance_tab_name, aplo_balances)

        logging.info("Fetching ADS Balances")
        ads_balance_tab_name = "ads_balances"
        ads_balances = get_ads_balances()
        gsu_risk_dash.dump_current_sheet(ads_balance_tab_name, ads_balances)

        logging.info("Fetching Aggregated Balances")
        aggregated_balances_tab_name = "Aggregated Balances"
        aggregated_balances_df = aggregated_balances(talos_balances_wl, aplo_balances, ads_balances, fireblocks_balances)
        gsu_risk_dash.dump_current_sheet(aggregated_balances_tab_name, aggregated_balances_df)

        logging.info("Fetching Talos Open Orders")
        open_orders_tab_name = "Open_Orders"
        talos_open_orders = get_open_orders()
        gsu_risk_dash.dump_current_sheet(open_orders_tab_name, talos_open_orders)

        logging.info("Fetching Talos WL Open Orders")
        open_order_wl_tab_name = "Open_Orders_wl"
        wl_open_orders = get_open_orders_wl()
        gsu_risk_dash.dump_current_sheet(open_order_wl_tab_name, wl_open_orders)

        logging.info("Fetching Talos Completed Orders")
        completed_orders_tab_name = "completed_orders"
        wl_complete_orders = get_completed_orders(start_date)
        gsu_risk_dash.dump_current_sheet(completed_orders_tab_name, wl_complete_orders)

        # logging.info("Fetching Aplo Orders")
        # aplo_orders_tab_name = "aplo_orders"
        # aplo_open_orders = get_aplo_orders()
        # gsu_risk_dash.dump_current_sheet(aplo_orders_tab_name, aplo_open_orders)

        logging.info("Fetching Trades from spreadsheet")
        trades = get_trades_sheets(trade_sheet_names)
        for key, value in trades.items():
            gsu_risk_dash.dump_current_sheet(trade_sheet_names[key], value)

        logging.info("Updating timestamp")
        gsu_risk_dash.append_to_worksheet_at_location("dash", "C1", [[utc_now_formatted]])

        logging.info("Updating runner")
        gsu_risk_dash.append_to_worksheet_at_location("dash", "E1", [[name]])
        gsu_risk_dash.append_to_worksheet_at_location("dash", "F1", [[""]])
        if not debug:
            print("Code complete, sleeping for 60s... Zzzzz...")
        logging.info("Code complete, sleeping for 60s... Zzzzz...")
        print("-----------------------------------------------")
        time.sleep(60)
        logging.info("What a nice sleep, waking up!")
        retries = 0
    except Exception as e:
        logging.error(f"Error: {e}")
        retries += 1
        error_msg = f"Code stopped at {utc_now_formatted} UTC, please check. Attempt {retries}/{max_retries}. Error: {e}"
        logging.error(f"Attempt {retries}/{max_retries} failed. Error: {e}")
        gsu_risk_dash.append_to_worksheet_at_location("dash", "F1", [[error_msg]])
        if retries >= max_retries:
            break

INFO (2025/03/12 03:57:30 PM): Fetchng Talos Data (Line:20 in <module>, 2023372639.py))
INFO (2025/03/12 03:57:31 PM): Fetching Fireblocks Balances in BigQuery (Line:26 in <module>, 2023372639.py))
INFO (2025/03/12 03:57:35 PM): Fetching Aplo Balances (Line:31 in <module>, 2023372639.py))
INFO (2025/03/12 03:57:36 PM): Fetching ADS Balances (Line:36 in <module>, 2023372639.py))
INFO (2025/03/12 03:57:36 PM): getting vaults path: https://api.anchorage.com/v2/vaults?limit=100 (Line:38 in _query_vaults, vault_utils.py))


<Response [200]>


INFO (2025/03/12 03:57:38 PM): Fetching Aggregated Balances (Line:41 in <module>, 2023372639.py))
INFO (2025/03/12 03:57:50 PM): Fetching Talos Open Orders (Line:46 in <module>, 2023372639.py))
INFO (2025/03/12 03:58:12 PM): Fetching Talos WL Open Orders (Line:51 in <module>, 2023372639.py))
INFO (2025/03/12 03:58:33 PM): Fetching Talos Completed Orders (Line:56 in <module>, 2023372639.py))
INFO (2025/03/12 03:58:45 PM): Fetching Trades from spreadsheet (Line:66 in <module>, 2023372639.py))
INFO (2025/03/12 03:59:30 PM): Updating timestamp (Line:71 in <module>, 2023372639.py))
INFO (2025/03/12 03:59:31 PM): Updating runner (Line:74 in <module>, 2023372639.py))
INFO (2025/03/12 03:59:31 PM): Code complete, sleeping for 60s... Zzzzz... (Line:79 in <module>, 2023372639.py))


-----------------------------------------------


KeyboardInterrupt: 