In [1]:
import os
import time
import sys
import pandas as pd
import numpy as np
from binance.client import Client
from typing import List, Dict, Callable, Tuple, Any
from dotenv import load_dotenv
load_dotenv()

api_key = os.getenv("BINANCE_API_KEY")
api_secret = os.getenv("BINANCE_SECRET_KEY")

if not api_key or not api_secret:
    raise RuntimeError("API keys not found. Set BINANCE_API_KEY and BINANCE_SECRET_KEY in your environment or .env file.")

In [2]:
client = Client(api_key, api_secret)

In [3]:
def get_klines_multi(client: Client, symbols: List[str], interval: str, limit: int = 500) -> Dict[str, pd.DataFrame]:
    """
    Fetch candlestick (kline) data from Binance for multiple symbols.

    Parameters:
        symbols (list of str): List of trading pair symbols, e.g. ['BTCUSDT', 'ETHUSDT'].
        interval (str): Kline interval, e.g. Client.KLINE_INTERVAL_15MINUTE, Client.KLINE_INTERVAL_1HOUR.
        limit (int): Optional. Number of candles to retrieve per symbol (default 500, max 1000).

    Returns:
        dict :Dictionary mapping each symbol to its corresponding pandas DataFrame.
    """
    data = {}

    start_time = time.time()

    for symbol in symbols:
        print(f"Fetching {symbol} ({interval}, limit={limit}) ...")
        klines = client.get_klines(symbol=symbol, interval=interval, limit=limit)
        
        cols = ["open_time","open","high","low","close","volume","close_time",
                "quote_asset_volume","num_trades","taker_buy_base_asset_volume",
                "taker_buy_quote_asset_volume","ignore"]
        
        df = pd.DataFrame(klines, columns=cols)
        df["open_time"] = pd.to_datetime(df["open_time"], unit="ms")
        df["close_time"] = pd.to_datetime(df["close_time"], unit="ms")
        numeric_cols = ["open","high","low","close","volume"]
        df[numeric_cols] = df[numeric_cols].astype(float)
        df = df[["open_time","open","high","low","close","volume","close_time","num_trades"]]
        
        data[symbol] = df

    end_time = time.time()

    elapsed_time = end_time-start_time
    print(f"Elapsed Time: {elapsed_time:.2f}")
    
    return data

In [4]:
symbols = ["BTCUSDT", "ETHUSDT", "SOLUSDT", "XRPUSDT", "DOGEUSDT"]

In [5]:
n_1d = 200
n_4h = n_1d * 6
n_1h = n_4h * 4
n_15m = n_1h * 4

data_1D = get_klines_multi(
    client=client,
    symbols=symbols,
    interval=Client.KLINE_INTERVAL_1DAY,
    limit=n_1d
)

data_4h = get_klines_multi(
    client=client,
    symbols=symbols,
    interval=Client.KLINE_INTERVAL_4HOUR,
    limit=n_4h
)

data_1h = get_klines_multi(
    client=client,
    symbols=symbols,
    interval=Client.KLINE_INTERVAL_1HOUR,
    limit=n_1h
)

data_15m = get_klines_multi(
    client=client,
    symbols=symbols,
    interval=Client.KLINE_INTERVAL_15MINUTE,
    limit=n_15m
)

Fetching BTCUSDT (1d, limit=200) ...
Fetching ETHUSDT (1d, limit=200) ...
Fetching SOLUSDT (1d, limit=200) ...
Fetching XRPUSDT (1d, limit=200) ...
Fetching DOGEUSDT (1d, limit=200) ...
Elapsed Time: 1.37
Fetching BTCUSDT (4h, limit=1200) ...
Fetching ETHUSDT (4h, limit=1200) ...
Fetching SOLUSDT (4h, limit=1200) ...
Fetching XRPUSDT (4h, limit=1200) ...
Fetching DOGEUSDT (4h, limit=1200) ...
Elapsed Time: 1.56
Fetching BTCUSDT (1h, limit=4800) ...
Fetching ETHUSDT (1h, limit=4800) ...
Fetching SOLUSDT (1h, limit=4800) ...
Fetching XRPUSDT (1h, limit=4800) ...
Fetching DOGEUSDT (1h, limit=4800) ...
Elapsed Time: 2.59
Fetching BTCUSDT (15m, limit=19200) ...
Fetching ETHUSDT (15m, limit=19200) ...
Fetching SOLUSDT (15m, limit=19200) ...
Fetching XRPUSDT (15m, limit=19200) ...
Fetching DOGEUSDT (15m, limit=19200) ...
Elapsed Time: 2.14


In [6]:
def get_tqdm():
    """Return tqdm that works for notebook or terminal."""
    try:
        # If running inside Jupyter/IPython kernel
        if "ipykernel" in sys.modules:
            from tqdm.notebook import tqdm
        else:
            from tqdm import tqdm
    except ImportError:
        # fallback in case notebook extra is not installed
        from tqdm import tqdm
    return tqdm

def apply_to_dict(stock_dict: Dict[str, pd.DataFrame], function: Callable[..., pd.DataFrame], **kwargs) -> Dict[str, pd.DataFrame]:
    """
    Apply a transformation function to each DataFrame in a dictionary.

    Args:
        stock_dict (Dict[str, pd.DataFrame]): Mapping of symbol â†’ DataFrame.
        function (Callable[..., pd.DataFrame]): Function to apply to each DataFrame.
        **kwargs: Additional keyword arguments for the function.

    Returns:
        Dict[str, pd.DataFrame]: Updated mapping with transformed DataFrames.
    """
    tqdm = get_tqdm()

    new_stock_dict: Dict[str, pd.DataFrame] = {}
    for symbol, df in tqdm(stock_dict.items(), desc="Processing symbols", total=len(stock_dict)):
        df_copy = df.copy()
        try:
            new_stock_dict[symbol] = function(df_copy, **kwargs)
        except Exception as e:
            raise RuntimeError(f"Error processing {symbol}: {e}")
    return new_stock_dict

def calculate_ma(df: pd.DataFrame, period: int, source: str, method: str) -> pd.DataFrame:
    """
    Calculate a moving average (EMA or SMA) and add it as a new column.

    Args:
        df (pd.DataFrame): DataFrame containing a source column (e.g., 'Close').
        period (int): Lookback period for the moving average.
        source (str, optional): Column to calculate MA on. Defaults to "Close".
        method (str, optional): Type of moving average ("EMA" or "SMA"). Defaults to "EMA".

    Returns:
        pd.DataFrame: DataFrame with the new MA column added.

    Raises:
        ValueError: If the source column is missing, if period <= 0, or if method is invalid.
        TypeError: If period is not an integer.
    """
    try:
        if method.upper() == "EMA":
            ma_series = df[source].ewm(span=period, adjust=False).mean()
            ma_series.iloc[:period - 1] = pd.NA
        else:  # SMA
            ma_series = df[source].rolling(window=period).mean()

        col_name = f"{method.upper()}_{source}_{period}"
        df[col_name] = ma_series
        return df
    except Exception as e:
        raise RuntimeError(f"Failed to calculate {method.upper()} on column '{source}' with period {period}: {e}")

def mark_crossovers(df: pd.DataFrame, short_ma_params: Tuple, long_ma_params: Tuple) -> pd.DataFrame:
    """
    Detect moving average crossovers (Bullish or Bearish).

    Args:
        df (pd.DataFrame): DataFrame containing the required MA columns.
        short_ma_params (Tuple[int, str, str]): (period, source, method) for the short MA.
        long_ma_params (Tuple[int, str, str]): (period, source, method) for the long MA.

    Returns:
        pd.DataFrame: DataFrame with an added crossover signal column.

    Raises:
        ValueError: If the required MA columns are missing or periods are invalid.
    """
    try:
        short_p, short_s, short_m = short_ma_params
        long_p, long_s, long_m = long_ma_params

        if short_p >= long_p:
            raise ValueError(
                f"Short MA must have smaller period than Long MA.\nFound Short MA Period: {short_p}, Long MA Period: {long_p}"
            )

        short_col = f"{short_m}_{short_s}_{short_p}"
        long_col = f"{long_m}_{long_s}_{long_p}"
        crossover_col = f"{short_col}_{long_col}_Crossover"

        if short_col not in df.columns or long_col not in df.columns:
            raise ValueError(
                f"Required columns '{short_col}' and/or '{long_col}' not found in DataFrame."
            )

        cross_up = (df[short_col] > df[long_col]) & (df[short_col].shift(1) <= df[long_col].shift(1))
        cross_down = (df[short_col] < df[long_col]) & (df[short_col].shift(1) >= df[long_col].shift(1))

        df[crossover_col] = "No"
        df.loc[cross_up, crossover_col] = "Bullish"
        df.loc[cross_down, crossover_col] = "Bearish"

        return df
    except Exception as e:
        raise RuntimeError(f"Failed to calculate MA Crossover: {e}")  

def add_indicators(df: pd.DataFrame, indicators_config: Dict[str, Any] = None) -> pd.DataFrame:
    """
    Clean OHLCV data and compute technical indicators based on a config dictionary.

    Args:
        df (pd.DataFrame): Symbol DataFrame with OHLCV columns.
        indicators_config (Dict[str, Any], optional): Dictionary with indicator names as keys and parameters as values.

    Returns:
        pd.DataFrame: DataFrame with added indicators.
    """
    if indicators_config is None:
        raise ValueError("indicators_config must be provided as a dictionary.")

    # Ensure numeric OHLCV
    for col in ["open", "high", "low", "close", "volume"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # Moving averages
    for period, source, method in indicators_config.get("moving_averages", []):
        df = calculate_ma(df, period=period, source=source, method=method)
        ma_col = f"{method}_{source}_{period}"
        df[f"Above_{ma_col}"] = df["close"] > df[ma_col]

    # Crossover signals
    for short_params, long_params in indicators_config.get("crossovers", []):
        df = mark_crossovers(df, short_params, long_params)
   
    return df

In [7]:
default_indicators = {
  "moving_averages": [
    [8, "close", "EMA"],
    [20, "open", "EMA"],
    # [34, "close", "EMA"],
    # [50, "close", "EMA"],
    # [200, "close", "EMA"]
  ],
  "crossovers": [
    [[8, "close", "EMA"], [20, "open", "EMA"]]
  ],
}

In [8]:
data_1D_labeled = apply_to_dict(
    data_1D, 
    add_indicators, 
    indicators_config=default_indicators
)

data_4h_labeled = apply_to_dict(
    data_4h, 
    add_indicators, 
    indicators_config=default_indicators
)

data_1h_labeled = apply_to_dict(
    data_1h, 
    add_indicators, 
    indicators_config=default_indicators
)

data_15m_labeled = apply_to_dict(
    data_15m, 
    add_indicators, 
    indicators_config=default_indicators
)

Processing symbols:   0%|          | 0/5 [00:00<?, ?it/s]

Processing symbols:   0%|          | 0/5 [00:00<?, ?it/s]

Processing symbols:   0%|          | 0/5 [00:00<?, ?it/s]

Processing symbols:   0%|          | 0/5 [00:00<?, ?it/s]

In [21]:
def required_price_for_crossover(
    df,
    ema_fast_col="EMA_close_8",
    ema_slow_col="EMA_open_20",
    period_fast=8,
    offset=1e-6
):
    ema_fast_latest = df[ema_fast_col].iloc[-2]
    ema_slow_latest = df[ema_slow_col].iloc[-1]
    ema_slow_prev = df[ema_slow_col].iloc[-2]

    threshold = ema_slow_latest
    alpha = 2 / (period_fast + 1)

    current_price = df["close"].iloc[-1]

    if ema_fast_latest < ema_slow_prev:
        desired_ema = threshold + offset
        direction = "above"
    else:
        desired_ema = threshold - offset
        direction = "below"

    needed_price = (desired_ema - (1 - alpha) * ema_fast_latest) / alpha
    percent_change = (needed_price - current_price) / current_price * 100

    results = {
        "needed_price": needed_price,
        "percent_change": percent_change,
        "direction": direction
    }

    return results

def get_required_price_for_crossover(df: pd.DataFrame, ema_fast_col: str ="EMA_close_8", ema_slow_col: str ="EMA_open_20", period_fast: int =8, offset: float =1e-6):
    """
    Calculate price that is needed for crossover in the current candle.

    Args:
        df (pd.DataFrame): Symbol DataFrame with OHLCV columns.
        ema_fast_col (str): Columnt to take fast EMA from.
        ema_slow_col (str): Columnt to take slow EMA from.
        period_fast (int): Length of the fast period for required price calculation.
        offset (float): Small bias to estimate for cross up or below, not just equalizing of moving averages.

    Returns:
        float: Required price change for crossover.
    """
    ema_fast_latest = df[ema_fast_col].iloc[-2]
    ema_slow_latest = df[ema_slow_col].iloc[-1]
    ema_slow_prev = df[ema_slow_col].iloc[-2]
    print(f'EMA Fast Latest: {ema_fast_latest}, EMA Slow Latest: {ema_slow_latest}, EMA Slow Previous: {ema_slow_prev}')

    threshold = ema_slow_latest
    alpha = 2 / (period_fast + 1)

    print(f'Threshold: {threshold}')

    current_price = df["open"].iloc[-1]

    print(f'Current Price: {current_price}')

    if ema_fast_latest < ema_slow_prev:
        desired_ema = threshold + offset
    else:
        desired_ema = threshold - offset

    needed_price = (desired_ema - (1 - alpha) * ema_fast_latest) / alpha
    required_percent_change = (needed_price - current_price) / current_price * 100

    return needed_price, required_percent_change

In [93]:
needed_change = apply_to_dict(data_15m_labeled, required_price_for_crossover)

Processing symbols:   0%|          | 0/5 [00:00<?, ?it/s]

In [19]:
test_df = data_4h_labeled['BTCUSDT']
test_df = test_df.iloc[:-1]
test_df.tail()

Unnamed: 0,open_time,open,high,low,close,volume,close_time,num_trades,EMA_close_8,Above_EMA_close_8,EMA_open_20,Above_EMA_open_20,EMA_close_8_EMA_open_20_Crossover
994,2025-11-08 20:00:00,101989.65,102482.2,101957.94,102312.94,925.26159,2025-11-08 23:59:59.999,260009,102145.287248,True,102620.613761,False,No
995,2025-11-09 00:00:00,102312.95,102337.89,101400.0,101662.05,2156.33613,2025-11-09 03:59:59.999,616651,102037.901193,False,102591.312451,False,No
996,2025-11-09 04:00:00,101662.05,102130.61,101635.96,101944.59,1101.66332,2025-11-09 07:59:59.999,363847,102017.165372,False,102502.811265,False,No
997,2025-11-09 08:00:00,101944.6,102307.68,101500.0,102239.41,1825.56978,2025-11-09 11:59:59.999,353586,102066.553067,True,102449.648287,False,No
998,2025-11-09 12:00:00,102239.41,104168.0,102050.64,103762.17,4687.64884,2025-11-09 15:59:59.999,722447,102443.35683,True,102429.625593,True,Bullish


In [22]:
get_required_price_for_crossover(test_df)

EMA Fast Latest: 102066.55306747206, EMA Slow Latest: 102429.62559340763, EMA Slow Previous: 102449.64828745055
Threshold: 102429.62559340763
Current Price: 102239.41


(np.float64(103700.3794386821), np.float64(1.4289689647877388))