In [1]:
import datetime
import pandas as pd
import numpy as np
import yfinance as yf


# for plotting purposes
from datetime import datetime
from plotly.subplots import make_subplots
import plotly.graph_objects as go

In [2]:
# Equivalent timeframe mappings
frame_M1 = '1m'
frame_M15 = '15m'
frame_M30 = '30m'
frame_H1 = '1h'
frame_H4 = '4h'
frame_D1 = '1d'
frame_W1 = '1wk'


# Matching your MT5 asset list with Yahoo tickers
assets = {
    'EURUSD': 'EURUSD=X',
    'USDCHF': 'USDCHF=X',
    'GBPUSD': 'GBPUSD=X',
    'USDCAD': 'USDCAD=X',
    'BTCUSD': 'BTC-USD',
    'ETHUSD': 'ETH-USD',
    'XAUUSD': 'XAUUSD=X',
    'XAGUSD': 'XAGUSD=X',
    'SP500m': '^GSPC',
    'UK100': '^FTSE'
}

In [None]:
# SUPPORT AND RESISTANCE

def get_quotes(time_frame,  period='2yrs', asset='EURUSD'):
    try:
        symbol = assets[asset]
        df = yf.download(
            symbol,
            period=period,
            interval=time_frame,
        )

        if df.empty:
            print(f"No data for {asset}. Check ticker symbol or timeframe.")
            return pd.DataFrame()

        df.reset_index(inplace=True)
        return df

    except Exception as e:
        print(f"Error fetching data for {asset}: {e}")
        return pd.DataFrame()


def mass_import(asset_index, time_frame):

    asset_list = list(assets.keys())
    asset = asset_list[asset_index]

    frame_M15 = '15m'
    frame_M30 = '30m'
    frame_H1 = '1h'
    frame_H4 = '4h'
    frame_D1 = '1d'
    frame_W1 = '1wk'

    if time_frame == '1WK':
        data = get_quotes(frame_W1, '10y', asset=asset)
    elif time_frame == 'D1':
        data = get_quotes(frame_D1, '5y', asset=asset)
    elif time_frame == '4H':
        data = get_quotes(frame_H4, '2y', asset=asset)
    elif time_frame == '1H':
        data = get_quotes(frame_H1, '2y', asset=asset)
    elif time_frame == 'M30':
        data = get_quotes(frame_M30, '60d', asset=asset)
    elif time_frame == 'M15':
        data = get_quotes(frame_M15, '60d', asset=asset)
    else:
        data = get_quotes(frame_D1, 'D1', asset=asset)

    if data.empty:
        print(f"No data found for {asset} on timeframe {time_frame}")
        return np.array([])

    return data


def detect_key_levels(df, current_candle, backcandles=50, test_candles=10):
    """
    Detects key support and resistance levels in a given backcandles window.
    
    A level is identified if a candle's high is the highest or its low is the lowest 
    compared to `test_candles` before and after it.

    Parameters:
        df (pd.DataFrame): DataFrame containing 'High' and 'Low' columns.
        current_candle (int): The index of the current candle (latest available candle).
        backcandles (int): Number of candles to look back.
        test_candles (int): Number of candles before and after each candle to check.

    Returns:
        dict: A dictionary with detected 'support' and 'resistance' levels.
    """
    key_levels = {"support": [], "resistance": []}

    # Define the last candle that can be tested to avoid lookahead bias
    last_testable_candle = current_candle - test_candles

    # Ensure we have enough data
    if last_testable_candle < backcandles + test_candles:
        return key_levels  # Not enough historical data

    # Iterate through the backcandles window
    for i in range(current_candle - backcandles, last_testable_candle):
        high = df['High'].iloc[i]
        low = df['Low'].iloc[i]

        # Get surrounding window of test_candles before and after
        before = df.iloc[max(0, i - test_candles):i]
        after = df.iloc[i + 1: min(len(df), i + test_candles + 1)]

        # Check if current high is the highest among before & after candles
        if high > before['High'].max() and high > after['High'].max():
            key_levels["resistance"].append((i, high))

        # Check if current low is the lowest among before & after candles
        if low < before['Low'].min() and low < after['Low'].min():
            key_levels["support"].append((i, low))

    return key_levels


def fill_key_levels(df, backcandles=50, test_candles=10):
    """
    Adds a 'key_levels' column to the DataFrame where each row contains all
    key support and resistance levels detected up to that candle (including
    both the level value and the index of the candle that generated it).
    
    Parameters:
        df (pd.DataFrame): DataFrame containing 'High' and 'Low' columns.
        backcandles (int): Lookback window for detecting key levels.
        test_candles (int): Number of candles before/after for validation.

    Returns:
        pd.DataFrame: Updated DataFrame with the new 'key_levels' column.
    """
    df["key_levels"] = None  # Initialize the column

    from tqdm import tqdm
    for current_candle in tqdm(range(backcandles + test_candles, len(df))):
        # Detect key levels for the current candle
        key_levels = detect_key_levels(
            df, current_candle, backcandles, test_candles)

        # Collect support and resistance levels (with their indices) up to current_candle
        support_levels = [(idx, level) for (idx, level) in key_levels["support"]
                          if idx < current_candle]
        resistance_levels = [(idx, level) for (idx, level) in key_levels["resistance"]
                             if idx < current_candle]

        # Store the levels along with the originating candle index
        if support_levels or resistance_levels:
            df.at[current_candle, "key_levels"] = {
                "support": support_levels,
                "resistance": resistance_levels
            }

    return df


def flatten_yf_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Removes the 'Ticker' row level from a Yahoo Finance DataFrame
    that has multi-index columns like ('EURUSD=X', 'Open').

    Returns a DataFrame with simple column names like 'Open', 'High', etc.
    """
    # If columns are MultiIndex (e.g. ('EURUSD=X', 'Open')), flatten them
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = df.columns.droplevel(1)
    return df

def detect_fvg(data, lookback_period=14, body_multiplier=1.5, min_gap=0.2):
    """
    Detects Fair Value Gaps (FVGs) in historical price data.

    Rules Added:
    - Middle candle body must be > avg body * body_multiplier
    - The FVG gap must be > 10% of the middle candle body
    """

    fvg_list = [None, None]

    for i in range(2, len(data)):
        first_high = data['High'].iloc[i-2]
        first_low = data['Low'].iloc[i-2]
        middle_open = data['Open'].iloc[i-1]
        middle_close = data['Close'].iloc[i-1]
        third_low = data['Low'].iloc[i]
        third_high = data['High'].iloc[i]

        # Middle candle body
        middle_body = abs(middle_close - middle_open)

        # Avg body over lookback
        prev_bodies = (data['Close'].iloc[max(0, i-1-lookback_period):i-1] -
                       data['Open'].iloc[max(0, i-1-lookback_period):i-1]).abs()
        avg_body_size = prev_bodies.mean()
        avg_body_size = avg_body_size if avg_body_size > 0 else 0.001

        # Minimum required FVGgap size 
        min_gap = middle_body * 0.20

        # -----------------------------
        # Check Bullish FVG
        # -----------------------------
        if third_low > first_high:
            gap_size = third_low - first_high

            if (middle_body > avg_body_size * body_multiplier
                    and gap_size > min_gap):

                fvg_list.append(('bullish', first_high, third_low, i))
                # print("bullish", first_high, third_low, i)
                continue

        # -----------------------------
        # Check Bearish FVG
        # -----------------------------
        if third_high < first_low:
            gap_size = first_low - third_high

            if (middle_body > avg_body_size * body_multiplier
                    and gap_size > min_gap):

                fvg_list.append(('bearish', first_low, third_high, i))
                # print("bearish", first_low, third_high, i)
                continue

        # No FVG
        fvg_list.append(None)

    return fvg_list

In [4]:
dataF = mass_import(0, '1H')
dataF = dataF.reset_index()            # Move Datetime from index to column
dataF.rename(columns={'Date': 'Gmt time'}, inplace=True)


try:
    my_data = dataF[['Datetime', 'Open', 'High', 'Low', 'Close', 'Volume']]
except:
    my_data = dataF[['Gmt time', 'Open', 'High', 'Low', 'Close', 'Volume']]


my_data = flatten_yf_columns(my_data)
my_data

  df = yf.download(
[*********************100%***********************]  1 of 1 completed


Price,Datetime,Open,High,Low,Close,Volume
0,2023-11-20 09:00:00+00:00,1.092299,1.094331,1.092299,1.093972,0
1,2023-11-20 10:00:00+00:00,1.093733,1.093853,1.092538,1.093016,0
2,2023-11-20 11:00:00+00:00,1.092896,1.093135,1.092419,1.092538,0
3,2023-11-20 12:00:00+00:00,1.092538,1.093255,1.092419,1.093255,0
4,2023-11-20 13:00:00+00:00,1.093374,1.093494,1.092777,1.093016,0
...,...,...,...,...,...,...
12376,2025-11-20 05:00:00+00:00,1.152206,1.152206,1.151808,1.151941,0
12377,2025-11-20 06:00:00+00:00,1.151808,1.152605,1.151808,1.152339,0
12378,2025-11-20 07:00:00+00:00,1.152074,1.152472,1.151941,1.152206,0
12379,2025-11-20 08:00:00+00:00,1.152339,1.152605,1.151543,1.152472,0


In [5]:

my_data['FVG'] = detect_fvg(my_data)
my_data = fill_key_levels(my_data, backcandles=50, test_candles=10)
my_data.tail(20)

100%|██████████| 12321/12321 [00:26<00:00, 467.93it/s]


Price,Datetime,Open,High,Low,Close,Volume,FVG,key_levels
12361,2025-11-19 14:00:00+00:00,1.157675,1.157675,1.155936,1.155936,0,,"{'support': [(12340, 1.1575413942337036)], 're..."
12362,2025-11-19 15:00:00+00:00,1.156069,1.156069,1.154468,1.155001,0,"(bearish, 1.1574074029922485, 1.15606939792633...","{'support': [(12340, 1.1575413942337036)], 're..."
12363,2025-11-19 16:00:00+00:00,1.155001,1.155001,1.153802,1.154335,0,"(bearish, 1.1559356451034546, 1.15500116348266...","{'support': [(12340, 1.1575413942337036)], 're..."
12364,2025-11-19 17:00:00+00:00,1.154468,1.154601,1.153004,1.153669,0,,"{'support': [(12340, 1.1575413942337036)], 're..."
12365,2025-11-19 18:00:00+00:00,1.153536,1.153935,1.15327,1.153403,0,,"{'support': [(12340, 1.1575413942337036)], 're..."
12366,2025-11-19 19:00:00+00:00,1.153136,1.15327,1.152206,1.152339,0,,"{'support': [(12340, 1.1575413942337036)], 're..."
12367,2025-11-19 20:00:00+00:00,1.152472,1.153136,1.152472,1.153004,0,,"{'support': [(12340, 1.1575413942337036)], 're..."
12368,2025-11-19 21:00:00+00:00,1.152871,1.154201,1.152738,1.154201,0,,"{'support': [(12340, 1.1575413942337036)], 're..."
12369,2025-11-19 22:00:00+00:00,1.154335,1.154335,1.153935,1.154068,0,"(bullish, 1.1531364917755127, 1.15393495559692...","{'support': [(12340, 1.1575413942337036)], 're..."
12370,2025-11-19 23:00:00+00:00,1.154335,1.154468,1.154068,1.154201,0,,"{'support': [(12340, 1.1575413942337036)], 're..."


In [6]:
def plot_fvg_and_key_levels(df, start_idx, end_idx, extension=20):
    """
    Plots candlesticks, FVG zones, and key levels (support/resistance) for a
    subset of a DataFrame from `start_idx` to `end_idx`.
    
    The FVG column is expected to have tuples of the form:
        (fvg_type, start_price, end_price, trigger_index)

    The key_levels column is expected to have dictionaries of the form:
        {
          "support": [(idx, price), (idx, price), ...],
          "resistance": [(idx, price), (idx, price), ...]
        }

    Parameters:
    -----------
    df : pd.DataFrame
        Must contain: "Open", "High", "Low", "Close", "FVG", "key_levels".
    start_idx : int
        Starting row index for plotting.
    end_idx : int
        Ending row index for plotting.
    extension : int
        How far (in x-axis units/index steps) to extend the FVG rectangles
        and key-level lines.
    
    Returns:
    --------
    fig : plotly.graph_objects.Figure
        A Plotly Figure with the candlesticks, FVG, and key-level lines.
    """

    # Slice the DataFrame to the desired plotting range
    dfpl = df.loc[start_idx:end_idx]

    # Create the figure
    fig = go.Figure()

    # -- 1) Add Candlestick Chart --
    fig.add_trace(go.Candlestick(
        x=dfpl.index,
        open=dfpl["Open"],
        high=dfpl["High"],
        low=dfpl["Low"],
        close=dfpl["Close"],
        name="Candles"
    ))

    # -- 2) Add FVG Zones --
    for i, row in dfpl.iterrows():
        # Check if "FVG" is a valid tuple: (fvg_type, start_price, end_price, trigger_index)
        if isinstance(row.get("FVG"), tuple):
            fvg_type, start_price, end_price, trigger_idx = row["FVG"]

            # Choose a fill color based on bullish vs. bearish
            if fvg_type == "bullish":
                color = "rgba(0, 255, 0, 0.3)"   # greenish
            else:
                color = "rgba(255, 0, 0, 0.3)"   # reddish

            fig.add_shape(
                type="rect",
                x0=trigger_idx,
                x1=trigger_idx + extension,
                y0=start_price,
                y1=end_price,
                fillcolor=color,
                opacity=0.4,
                layer="below",
                line=dict(width=0),
            )

    # -- 3) Add Key Levels as Horizontal Lines --
    for i, row in dfpl.iterrows():
        key_levels = row.get("key_levels", None)
        if key_levels:
            # key_levels is a dict: {"support": [(idx, val), ...], "resistance": [(idx, val), ...]}
            support_levels = key_levels.get("support", [])
            resistance_levels = key_levels.get("resistance", [])

            # Plot support levels
            for (gen_idx, s_price) in support_levels:
                # We only draw the line if gen_idx is in (start_idx, end_idx)
                # You can decide to relax/omit this check if you want lines from outside the window.
                if start_idx <= gen_idx <= end_idx:
                    fig.add_shape(
                        type="line",
                        x0=gen_idx,
                        x1=gen_idx + extension,
                        y0=s_price,
                        y1=s_price,
                        line=dict(color="blue", width=2),
                        layer="below"
                    )

            # Plot resistance levels
            for (gen_idx, r_price) in resistance_levels:
                if start_idx <= gen_idx <= end_idx:
                    fig.add_shape(
                        type="line",
                        x0=gen_idx,
                        x1=gen_idx + extension,
                        y0=r_price,
                        y1=r_price,
                        line=dict(color="orange", width=2),
                        layer="below"
                    )

    # -- 4) Figure Aesthetics --
    fig.update_layout(
        width=1200,
        height=800,
        xaxis=dict(showgrid=False),
        yaxis=dict(showgrid=False),
        plot_bgcolor='black',
        paper_bgcolor='black'
    )

    fig.show()
    return fig


fig = plot_fvg_and_key_levels(
    my_data, start_idx=12174, end_idx=12374, extension=20)

In [7]:
def detect_break_signal(df):
    """
    Detects if the current candle carries an FVG signal and,
    at the same time, the previous candle has crossed a key level
    in the expected direction (up for bullish, down for bearish).

    - If FVG is bullish and previous candle crosses ABOVE a level -> signal = 2
    - If FVG is bearish and previous candle crosses BELOW a level -> signal = 1
    - Otherwise -> signal = 0

    The 'FVG' column is expected to have tuples like:
        (fvg_type, lower_price, upper_price, trigger_index)
      where fvg_type is either "bullish" or "bearish".

    The 'key_levels' column is expected to be a dictionary with:
        {
            'support': [(level_candle_idx, level_price), ...],
            'resistance': [(level_candle_idx, level_price), ...]
        }
    """

    # Initialize the new signal column to 0
    df["break_signal"] = 0

    # We start at 1 because we compare candle i with its previous candle (i-1)
    for i in range(1, len(df)):
        fvg = df.loc[i, "FVG"]
        key_levels = df.loc[i, "key_levels"]

        # We only proceed if there's an FVG tuple and some key_levels dict
        if isinstance(fvg, tuple) and isinstance(key_levels, dict):
            fvg_type = fvg[0]  # "bullish" or "bearish"

            # Previous candle's OHLC
            prev_open = df.loc[i-1, "Open"]
            prev_close = df.loc[i-1, "Close"]

            # -----------------------
            # 1) Bullish FVG check
            # -----------------------
            if fvg_type == "bullish":
                # Typically you'd check crossing a "resistance" level
                # crossing means the previous candle goes from below -> above
                resistance_levels = key_levels.get("resistance", [])

                for (lvl_idx, lvl_price) in resistance_levels:
                    # Condition: previously below, ended above
                    # simplest check is: prev_open < lvl_price < prev_close
                    if prev_open < lvl_price and prev_close > lvl_price:
                        df.loc[i, "break_signal"] = 2
                        break  # No need to check more levels

            # -----------------------
            # 2) Bearish FVG check
            # -----------------------
            elif fvg_type == "bearish":
                # Typically you'd check crossing a "support" level
                support_levels = key_levels.get("support", [])

                for (lvl_idx, lvl_price) in support_levels:
                    # Condition: previously above, ended below
                    # simplest check is: prev_open > lvl_price and prev_close < lvl_price
                    if prev_open > lvl_price and prev_close < lvl_price:
                        df.loc[i, "break_signal"] = 1
                        break  # No need to check more levels

    return df


my_data = detect_break_signal(my_data)
# my_data[my_data["break_signal"] != 0]