In [3]:
import pandas as pd
import mysql.connector
import numpy as np
import matplotlib.pyplot as plt
import math
import re
from itertools import combinations
from datetime import datetime, timedelta

# Connect to Database

In [4]:

# MySQL Configuration - Replace these with your actual credentials
DB_NAME = "currencies"
USER = "parsa"
PASSWORD = "Administrator$"
HOST = "mqr-db.mysql.database.azure.com"

# Connect to MySQL
connection = mysql.connector.connect(
    host=HOST,
    user=USER,
    password=PASSWORD,
    database=DB_NAME
)
cursor = connection.cursor()

In [47]:
# Dict that stores fx pairs to its dataframe 
fx_tables_cache:dict[tuple[str, str], pd.DataFrame] = dict()

# Triangular Abr on FX 

In [50]:
pattern = r'^([A-Z]{6})_\d{4}_\d{1,2}$'

def get_forex_data(table_name: str, start_date: str = None, end_date: str = None) -> pd.DataFrame:
    # Default to one month ago if start_date is not provided
    if start_date is None:
        start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d %H:%M:%S")

    # Default to now if end_date is not provided
    if end_date is None:
        end_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # Extract currency pair from table name
    pattern = r"([A-Z]{6})"
    match = re.match(pattern, table_name)
    if not match:
        raise ValueError(f"Invalid table name format: {table_name}")
    
    currency_pair = match.group(1)
    currency1, currency2 = currency_pair[:3], currency_pair[3:]

    # Check cache first
    if (currency1, currency2, start_date, end_date) in fx_tables_cache:
        return fx_tables_cache[(currency1, currency2, start_date, end_date)]

    # SQL query with date filtering
    query = f"""
        SELECT timestamp, bid, ask 
        FROM {table_name} 
        WHERE timestamp BETWEEN '{start_date}' AND '{end_date}'
        ORDER BY timestamp
    """
    
    cursor.execute(query)
    rows = cursor.fetchall()
    
    # Convert to DataFrame
    df = pd.DataFrame(rows, columns=["timestamp", f"bid_{currency1}_{currency2}", f"ask_{currency1}_{currency2}"])
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    
    # Create reverse bid/ask prices
    df[f"bid_{currency2}_{currency1}"] = df[f"ask_{currency1}_{currency2}"].replace(0, float("nan"))
    df[f"ask_{currency2}_{currency1}"] = df[f"bid_{currency1}_{currency2}"].replace(0, float("nan"))
    df[f"bid_{currency2}_{currency1}"] = (1 / df[f"bid_{currency2}_{currency1}"]).astype(float).round(5)
    df[f"ask_{currency2}_{currency1}"] = (1 / df[f"ask_{currency2}_{currency1}"]).astype(float).round(5)

    # Cache the result
    fx_tables_cache[(currency1, currency2, start_date, end_date)] = df

    return df

def getForexTables() -> tuple[dict[str, tuple[str, bool]], set[tuple[str, str]]]:
    cursor.execute("SHOW TABLES")
    all_tables = [table[0] for table in cursor.fetchall()]
    # Match tables that follow CURRENCYCURRENCY_YEAR_MONTH format
    forex_tables = {}
    currency_pairs = set()
    for table in all_tables:
        match = re.match(pattern, table)
        if match:
            currency_pair = match.group(1)
            currency1, currency2 = currency_pair[:3], currency_pair[3:]
            forex_tables[f"{currency1}{currency2}"] = (table, False)
            forex_tables[f"{currency2}{currency1}"] = (table, True)
            currency_pairs.add((currency1, currency2))
            currency_pairs.add((currency2, currency1))

    return (forex_tables, currency_pairs)

def getTriangularTriplets(pairs: set[tuple[str, str]]):
    G:dict[str, set[str]] = dict()
    for c1, c2 in pairs:
        if c2 in G: G[c2].add(c1)
        else:       G[c2] = {c1}
        if c1 in G: G[c1].add(c2)
        else:       G[c1] = {c2}

    triplets:list[tuple[str, str, str]] = []
    visited:set[str] = set()

    def findTriplets(start_currency:str, cur_currency:str, l:list, depth:int):    
        if depth == 3: 
            if cur_currency == start_currency:
                triplets.append(tuple(l[:-1]))
            else:
                return
        
        if cur_currency in visited:
            return

        visited.add(cur_currency)

        for neighbor in G[cur_currency]:
            l.append(neighbor)
            findTriplets(start_currency, neighbor, l, depth + 1)
            l.pop(-1)
        
        visited.remove(cur_currency)

    
    for currency in G:
        findTriplets(currency, currency, [currency], 0)
    
    return triplets

def join_forex_data(start_c: str, mid_c: str, end_c: str):
    fx_tables, pairs = getForexTables()
    start_c = start_c.strip()
    mid_c = mid_c.strip()
    end_c = end_c.strip()

    # Get forex tables and their reverse flags
    table1, reverse1 = fx_tables[start_c + mid_c]
    table2, reverse2 = fx_tables[mid_c + end_c]
    table3, reverse3 = fx_tables[end_c + start_c]

    # Convert tables to DataFrames
    df1 = get_forex_data(table1, reverse1)
    df2 = get_forex_data(table2, reverse2)
    df3 = get_forex_data(table3, reverse3)

    # Merge the dataframes on 'timestamp'
    merged_df = df1.merge(df2, on='timestamp', how='inner') \
                   .merge(df3, on='timestamp', how='inner')

    return merged_df


In [51]:
usd_cad = get_forex_data('USDCAD_2024_12', '2024-12-27', '2024-12-30')
usd_cad

Unnamed: 0,timestamp,bid_USD_CAD,ask_USD_CAD,bid_CAD_USD,ask_CAD_USD
0,2024-12-27 00:00:00.602,1.44097,1.44106,0.69393,0.69398
1,2024-12-27 00:00:01.024,1.44097,1.44107,0.69393,0.69398
2,2024-12-27 00:00:02.118,1.44098,1.44107,0.69393,0.69397
3,2024-12-27 00:00:04.274,1.44099,1.44109,0.69392,0.69397
4,2024-12-27 00:00:04.603,1.44098,1.44107,0.69393,0.69397
...,...,...,...,...,...
81311,2024-12-29 23:59:16.980,1.44025,1.44038,0.69426,0.69432
81312,2024-12-29 23:59:32.047,1.44025,1.44037,0.69427,0.69432
81313,2024-12-29 23:59:34.781,1.44026,1.44034,0.69428,0.69432
81314,2024-12-29 23:59:34.984,1.44026,1.44037,0.69427,0.69432


In [10]:
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller, coint

In [14]:
# make a function that does inner joins for multiple dataframes on the timestamp columns
def join_forex(*dfs: pd.DataFrame) -> pd.DataFrame:
    merged_df = dfs[0]
    for i in range(1, len(dfs)):
        merged_df = merged_df.merge(dfs[i], on='timestamp', how='inner')
    return merged_df

In [12]:
def is_stationary(series, p_value_threshold=0.05):
    result = adfuller(series.dropna())  # Drop NaN values before testing
    return result[1] < p_value_threshold  # Returns True if p-value < threshold

def find_cointegrated_pairs(*commodities, p_value_threshold=0.05):
    cointegrated_pairs = []
    
    # Merge all commodity dataframes using join_forex function
    merged_data = join_forex(*commodities)
    
    # Extract all currency pairs
    currency_columns = [col for col in merged_data.columns if col != 'timestamp']
    
    # Iterate over all possible pairs of currency mid-prices
    for (asset1, asset2) in combinations(currency_columns, 2):
        series1 = merged_data[asset1]
        series2 = merged_data[asset2]

        # Run the Engle-Granger cointegration test
        _, p_value, _ = coint(series1, series2)

        if p_value < p_value_threshold:  # If cointegrated
            # Run OLS regression to find hedge ratio (beta)
            X = sm.add_constant(series1)
            model = sm.OLS(series2, X).fit()
            hedge_ratio = model.params[1]

            # Create the spread (linear combination)
            spread = series2 - hedge_ratio * series1

            # Check if the spread is stationary
            if is_stationary(spread):
                cointegrated_pairs.append((asset1, asset2, hedge_ratio, p_value))
                print(f"Cointegrated Pair Found: {asset1} - {asset2}, Hedge Ratio: {hedge_ratio:.3f}, p-value: {p_value:.5f}")
    
    # Convert results into a DataFrame
    return pd.DataFrame(cointegrated_pairs, columns=["Asset1", "Asset2", "Hedge Ratio", "P-Value"])

In [52]:
usd_cad = get_forex_data('USDCAD_2024_12', '2024-12-27', '2024-12-30')
usd_jpy = get_forex_data('USDJPY_2024_12', '2024-12-27', '2024-12-30')
joined = join_forex(usd_cad, usd_jpy)
joined.head()

Unnamed: 0,timestamp,bid_USD_CAD,ask_USD_CAD,bid_CAD_USD,ask_CAD_USD,bid_USD_JPY,ask_USD_JPY,bid_JPY_USD,ask_JPY_USD
0,2024-12-27 00:00:08.181,1.44098,1.44107,0.69393,0.69397,157.757,157.775,0.00634,0.00634
1,2024-12-27 00:30:05.611,1.44105,1.44115,0.69389,0.69394,157.726,157.745,0.00634,0.00634
2,2024-12-27 01:00:30.827,1.44102,1.44112,0.6939,0.69395,157.913,157.927,0.00633,0.00633
3,2024-12-27 01:09:46.672,1.44125,1.44134,0.6938,0.69384,157.814,157.825,0.00634,0.00634
4,2024-12-27 01:10:46.059,1.4412,1.4413,0.69382,0.69387,157.784,157.803,0.00634,0.00634


In [None]:
``

In [54]:
# Find cointegrated pairs
cointegrated_pairs = find_cointegrated_pairs(usd_cad, usd_jpy)
cointegrated_pairs.head()

ValueError: Pandas data cast to numpy dtype of object. Check input data with np.asarray(data).

In [94]:
get_forex_data('CADJPY_2025_01', reverse=True)

Unnamed: 0,timestamp,bid_JPY_CAD,ask_JPY_CAD
0,2025-01-01 22:00:41.792,0.009172881293743177669537778512,0.009120925226654991882376548277
1,2025-01-01 22:00:50.262,0.009172881293743177669537778512,0.009121008418690770451581126809
2,2025-01-01 22:01:01.033,0.009172713013327952008365514268,0.009121091612244153380276551498
3,2025-01-01 22:01:11.018,0.009172628875435699871583195744,0.009121174807315182195466776121
4,2025-01-01 22:01:31.035,0.009172544739086964896671283514,0.009121258003903898425670868526
...,...,...,...
9890067,2025-01-31 21:59:56.039,0.009569561139926122987999770331,0.009272739074395185593872574020
9890068,2025-01-31 21:59:56.086,0.009377784029633797533642800206,0.009351912466099317310389974750
9890069,2025-01-31 21:59:56.179,0.009377871973291820620064894874,0.009361192241443870291320302554
9890070,2025-01-31 21:59:56.383,0.009377608147265958344664609845,0.009361104610344020594430142757


In [95]:
fx_tables, pairs = getForexTables()
fx_tables, pairs

({'AUDJPY': ('AUDJPY_2025_01', False),
  'JPYAUD': ('AUDJPY_2025_01', True),
  'AUDNZD': ('AUDNZD_2025_01', False),
  'NZDAUD': ('AUDNZD_2025_01', True),
  'AUDUSD': ('AUDUSD_2025_01', False),
  'USDAUD': ('AUDUSD_2025_01', True),
  'CADJPY': ('CADJPY_2025_01', False),
  'JPYCAD': ('CADJPY_2025_01', True),
  'CHFJPY': ('CHFJPY_2025_01', False),
  'JPYCHF': ('CHFJPY_2025_01', True),
  'EURCHF': ('EURCHF_2025_01', False),
  'CHFEUR': ('EURCHF_2025_01', True),
  'EURGBP': ('EURGBP_2025_01', False),
  'GBPEUR': ('EURGBP_2025_01', True),
  'EURJPY': ('EURJPY_2025_01', False),
  'JPYEUR': ('EURJPY_2025_01', True),
  'EURPLN': ('EURPLN_2025_01', False),
  'PLNEUR': ('EURPLN_2025_01', True),
  'EURUSD': ('EURUSD_2025_01', False),
  'USDEUR': ('EURUSD_2025_01', True),
  'GBPJPY': ('GBPJPY_2025_01', False),
  'JPYGBP': ('GBPJPY_2025_01', True),
  'GBPUSD': ('GBPUSD_2025_01', False),
  'USDGBP': ('GBPUSD_2025_01', True),
  'NZDUSD': ('NZDUSD_2025_01', False),
  'USDNZD': ('NZDUSD_2025_01', True),

In [96]:
fx_triplets = getTriangularTriplets(pairs)

In [97]:
fx1, fx2, fx3 = fx_triplets[0]
fxdata = join_forex_data(fx1, fx2, fx3)
fxdata

OperationalError: 2013 (HY000): Lost connection to MySQL server during query

In [108]:
def check_triangular_arbitrage(df: pd.DataFrame, start_c: str, mid_c: str, end_c: str):
    """
    Checks for triangular arbitrage opportunities at each timestamp.

    Parameters:
    - df (pd.DataFrame): The merged forex data containing bid and ask prices.
    - start_c (str): The starting currency.
    - mid_c (str): The intermediate currency.
    - end_c (str): The final currency.

    Returns:
    - pd.DataFrame: A DataFrame containing timestamps and a boolean flag for arbitrage existence.
    """

    # Define bid and ask columns
    bid_AB = f"bid_{start_c}_{mid_c}"
    ask_AB = f"ask_{start_c}_{mid_c}"
    bid_BC = f"bid_{mid_c}_{end_c}"
    ask_BC = f"ask_{mid_c}_{end_c}"
    bid_CA = f"bid_{end_c}_{start_c}"
    ask_CA = f"ask_{end_c}_{start_c}"

    # Ensure required columns exist
    required_cols = {bid_AB, ask_AB, bid_BC, ask_BC, bid_CA, ask_CA}
    if not required_cols.issubset(df.columns):
        raise ValueError("Missing required bid/ask columns in the DataFrame.")

    # Check for arbitrage: Using bid prices for conversions
    df["arbitrage_ratio"] = df[ask_AB] * df[ask_BC] * df[bid_CA]

    # Arbitrage exists if the ratio is greater than 1
    df["arbitrage_opportunity"] = df["arbitrage_ratio"] > 1.0

    return df


In [109]:
fx1, fx2, fx3 = fx_triplets[0]
abr_opportunities = check_triangular_arbitrage(fxdata, fx1, fx2, fx3)
abr_opportunities

Unnamed: 0,timestamp,bid_JPY_CHF,ask_JPY_CHF,bid_CHF_USD,ask_CHF_USD,bid_USD_JPY,ask_USD_JPY,arbitrage_ratio,arbitrage_opportunity
0,2025-01-01 22:20:42.953,0.005776440210955596504098384330,0.005755660692290868068745611309,1.104057410985371239304443831,1.101600625709155402800268791,157.15100,157.36600,0.9964063952919816780367731509,False
1,2025-01-01 23:00:00.576,0.005771905825584549762486075277,0.005753475098959771702108073276,1.103911157230066124278318081,1.102511521245397014398800467,157.14400,157.44000,0.9968072269089263349827689207,False
2,2025-01-01 23:07:40.165,0.005769142013199796926201135367,0.005767245505873939547732607429,1.102244169128345311053304528,1.102110541687331239323304127,157.27500,157.30900,0.9996622438268847108278457410,False
3,2025-01-01 23:17:31.577,0.005768709366653398635123363850,0.005767045946055052220601041528,1.102232019840176357123174428,1.102086249269867859858712543,157.29400,157.31700,0.9997263795792051577495621701,False
4,2025-01-01 23:17:31.686,0.005768642811405760566711469793,0.005767112464460169437764205840,1.102232019840176357123174428,1.102074103462716833079856290,157.30200,157.32600,0.9997777390779095559734456429,False
...,...,...,...,...,...,...,...,...,...
1100813,2025-01-31 21:59:01.548,0.005872990703055717062799889588,0.005870508327316062297834369478,1.097502085253961982527766803,1.097116777109755562382059946,155.17500,155.23100,0.9994252530952626136244879573,False
1100814,2025-01-31 21:59:02.345,0.005874474234556007237352256973,0.005870060344220338585080654629,1.097477995566188897912596852,1.097116777109755562382059946,155.15600,155.22400,0.9992266234781353990288037579,False
1100815,2025-01-31 21:59:30.006,0.005872852738217589193950961680,0.005869991429812512473731788352,1.097598454581375949422663213,1.097152888254978331230456964,155.16900,155.22400,0.9993315049345304164095538611,False
1100816,2025-01-31 21:59:40.600,0.005872852738217589193950961680,0.005869991429812512473731788352,1.097586407489929644711279896,1.097152888254978331230456964,155.16000,155.22900,0.9992735424320691594977500472,False


In [111]:
abr_opportunities[abr_opportunities["arbitrage_ratio"] > 1.001]

Unnamed: 0,timestamp,bid_JPY_CHF,ask_JPY_CHF,bid_CHF_USD,ask_CHF_USD,bid_USD_JPY,ask_USD_JPY,arbitrage_ratio,arbitrage_opportunity
119899,2025-01-06 14:16:32.859,0.005770074087751286726521568537,0.005768642811405760566711469793,1.105693214360743468117336164,1.105607641960021227666725632,156.97400,156.98700,1.001157501191411484166555655,True
119988,2025-01-06 14:16:42.828,0.005772172357066582008138763023,0.005770307153449778131689949856,1.105717666051150499231526222,1.105558749391942687834431522,157.00400,157.03000,1.001593446601765539499235935,True
120008,2025-01-06 14:16:43.843,0.005773372053415238238197784180,0.005771039768235042907680676828,1.105864398907405973879482898,1.105693214360743468117336164,157.03700,157.05700,1.002053020294254191233456560,True
120037,2025-01-06 14:16:45.953,0.005771539384984763136023640225,0.005769841040879323774630008943,1.105901088206670795364062638,1.105754345614578265292582600,156.89600,156.91700,1.001000685512188047839742894,True
120076,2025-01-06 14:16:47.859,0.005771372836456607933329100993,0.005769241863926661397425764280,1.106060103306013648781674796,1.105913318514094865244462139,157.00900,157.04100,1.001761604694890878812273231,True
...,...,...,...,...,...,...,...,...,...
121406,2025-01-06 14:17:58.109,0.005771572695843890501722814450,0.005769641301400291943849850855,1.104106170849388877234434863,1.104033032668337436656104751,157.14800,157.17200,1.001013051031115049507177719,True
121710,2025-01-06 14:18:09.390,0.005772638846395852936252749219,0.005771139684664927629908354302,1.103789308696755963221740234,1.103728394516677336041146995,157.22200,157.25100,1.001468095077800988752402022,True
121814,2025-01-06 14:18:14.264,0.005771606007087532176703489513,0.005770307153449778131689949856,1.103545692309390070295860600,1.103375224812702055588043826,157.23000,157.25200,1.001054157779246191309387313,True
316947,2025-01-10 13:31:46.530,0.005782419132868426835050711816,0.005780881468806363594320862045,1.091190816538088015451261962,1.091083664295378169598044778,158.70200,158.72700,1.001001015648875654809397993,True


# FX Arb with slippage model

In [100]:
# --- Distribution Tracker to update running mean and variance ---
class DistributionTracker:
    def __init__(self):
        self.count = 0
        self.mean = 0.0
        self.M2 = 0.0

    def update(self, value: float):
        self.count += 1
        delta = value - self.mean
        self.mean += delta / self.count
        delta2 = value - self.mean
        self.M2 += delta * delta2

    def variance(self) -> float:
        if self.count < 2:
            return 0.0
        return self.M2 / (self.count - 1)

    def std(self) -> float:
        return math.sqrt(self.variance())


# --- RateState for each currency pair ---
class RateState:
    def __init__(self, currency_pair: tuple):
        self.currency_pair = currency_pair  # e.g. ('USD', 'EUR')
        self.bid = None
        self.ask = None
        self.last_update = None
        # Two trackers: one for the bid change per second and one for ask change.
        self.bid_tracker = DistributionTracker()
        self.ask_tracker = DistributionTracker()

    def update(self, new_bid: float, new_ask: float, timestamp: datetime):
        if self.last_update is None:
            # First update: initialize values.
            self.bid = new_bid
            self.ask = new_ask
            self.last_update = timestamp
        else:
            dt = (timestamp - self.last_update).total_seconds()
            if dt > 0:
                bid_ratio = (new_bid - self.bid) / dt
                ask_ratio = (new_ask - self.ask) / dt
                self.bid_tracker.update(bid_ratio)
                self.ask_tracker.update(ask_ratio)
            # Update the stored prices and timestamp.
            self.bid = new_bid
            self.ask = new_ask
            self.last_update = timestamp

    def get_effective_bid(self) -> float:
        if self.bid_tracker.count > 0:
            # Sample a random adjustment from a normal distribution with mean and std of bid changes.
            adjustment = np.random.normal(loc=self.bid_tracker.mean, scale=self.bid_tracker.std())
        else:
            adjustment = 0.0
        return self.bid - adjustment

    def get_effective_ask(self) -> float:
        if self.ask_tracker.count > 0:
            # Sample a random adjustment from a normal distribution with mean and std of ask changes.
            adjustment = np.random.normal(loc=self.ask_tracker.mean, scale=self.ask_tracker.std())
        else:
            adjustment = 0.0
        return self.ask + adjustment


# --- MarketState to hold all currency pairs ---
class MarketState:
    def __init__(self):
        # Maps a currency pair tuple to its RateState.
        self.rates = {}

    def update_rate(self, currency_pair: tuple, new_bid: float, new_ask: float, timestamp: datetime):
        if currency_pair not in self.rates:
            self.rates[currency_pair] = RateState(currency_pair)
        self.rates[currency_pair].update(new_bid, new_ask, timestamp)

    def get_rate(self, currency_pair: tuple):
        return self.rates.get(currency_pair, None)


# --- Triangular Arbitrage Detector ---
class TriangularArbitrageDetector:
    def __init__(self, currency_triplet: tuple):
        """
        currency_triplet: A tuple like ('USD', 'EUR', 'GBP') defining a cycle:
          A -> B, B -> C, C -> A.
        """
        self.currencies = currency_triplet
        # Define the three currency pairs needed.
        self.pairs = {
            'AB': (currency_triplet[0], currency_triplet[1]),
            'BC': (currency_triplet[1], currency_triplet[2]),
            'CA': (currency_triplet[2], currency_triplet[0]),
        }
        self.market = MarketState()
        self.arb_opportunities = []  # to store arbitrage opportunities for plotting

    def process_tick(self, currency_pair: tuple, bid: float, ask: float, timestamp: datetime):
        # Update the market state with the new tick.
        self.market.update_rate(currency_pair, bid, ask, timestamp)
        # Only check arbitrage if all three pairs have been updated.
        if all(self.market.get_rate(pair) is not None for pair in self.pairs.values()):
            self.check_arbitrage(timestamp)

    def check_arbitrage(self, timestamp: datetime):
        # Retrieve the latest RateStates for each leg.
        rate_AB = self.market.get_rate(self.pairs['AB'])
        rate_BC = self.market.get_rate(self.pairs['BC'])
        rate_CA = self.market.get_rate(self.pairs['CA'])
        
        # Use the slippage-adjusted prices.
        effective_ask_AB = rate_AB.get_effective_ask()  # A -> B: buying B costs effective ask
        effective_ask_BC = rate_BC.get_effective_ask()  # B -> C: buying C costs effective ask
        effective_bid_CA = rate_CA.get_effective_bid()    # C -> A: selling C yields effective bid
        
        # Compute conversion product starting with 1 unit of currency A.
        # Conversion:
        #   A -> B: get 1 / effective_ask_AB units of B,
        #   B -> C: get 1 / effective_ask_BC units of C,
        #   C -> A: receive effective_bid_CA units of A.
        conversion_product = (1 / effective_ask_AB) * (1 / effective_ask_BC) * effective_bid_CA

        # Set a threshold (e.g., 1.0001) to account for fees/transaction costs.
        threshold = 1.0001
        if conversion_product > threshold:
            opportunity = {
                'timestamp': timestamp,
                'currencies': self.currencies,
                'raw_rates': {
                    'AB': (rate_AB.bid, rate_AB.ask),
                    'BC': (rate_BC.bid, rate_BC.ask),
                    'CA': (rate_CA.bid, rate_CA.ask)
                },
                'effective_rates': {
                    'AB': effective_ask_AB,
                    'BC': effective_ask_BC,
                    'CA': effective_bid_CA
                },
                'conversion_product': conversion_product
            }
            self.arb_opportunities.append(opportunity)
            print(f"Arbitrage opportunity at {timestamp}: product = {conversion_product:.5f}")


# --- Simulation function ---
def simulate_forex_arbitrage(fx_data_dict: dict, currency_triplet: tuple):
    """
    fx_data_dict: Dictionary mapping a currency pair tuple (e.g. ('USD','EUR'))
                  to a DataFrame with tick data having columns: 'timestamp', 'bid', 'ask'.
    currency_triplet: Tuple of three currencies defining the arbitrage cycle,
                      e.g. ('USD', 'EUR', 'GBP').
    """
    detector = TriangularArbitrageDetector(currency_triplet)
    
    # Gather all ticks from the dataframes into one list.
    # Each tick is a tuple: (timestamp, currency_pair, bid, ask)
    ticks = []
    for pair, df in fx_data_dict.items():
        for _, row in df.iterrows():
            ticks.append((row['timestamp'], pair, row['bid'], row['ask']))
    # Sort ticks by timestamp (this merges out-of-sync ticks)
    ticks.sort(key=lambda x: x[0])
    
    # Process each tick in order.
    for timestamp, currency_pair, bid, ask in ticks:
        detector.process_tick(currency_pair, bid, ask, timestamp)
    
    return detector.arb_opportunities


# --- Example usage ---
# Assume you have three DataFrames: df_AB, df_BC, df_CA corresponding to pairs:
#   ('USD','EUR'), ('EUR','GBP'), and ('GBP','USD').
#
# fx_data_dict = {
#     ('USD', 'EUR'): df_AB,
#     ('EUR', 'GBP'): df_BC,
#     ('GBP', 'USD'): df_CA,
# }
#
# opportunities = simulate_forex_arbitrage(fx_data_dict, ('USD', 'EUR', 'GBP'))
# Now, "opportunities" is a list of arbitrage opportunities that can be easily plotted.
