In [15]:
import pandas as pd
import numpy as np

btc = pd.read_csv('/Users/hatim/Desktop/Applied Forecasting/Final Project/Algo Trading/Data/btc.csv', index_col=0)
btc.index = pd.to_datetime(btc.index)
btc.columns = ['Open', 'High', 'Low', 'Close', 'Volume', 'Risk Free Rate']
btc=btc.iloc[-10000:]
btc

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Risk Free Rate
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2015-07-20,277.98,280.00,277.37,280.00,782.883420,0.02372
2015-07-21,279.96,281.27,276.85,277.32,4943.559434,0.02340
2015-07-22,277.33,278.54,275.01,277.89,4687.909383,0.02322
2015-07-23,277.96,279.75,276.28,277.39,5306.919575,0.02277
2015-07-24,277.23,291.52,276.43,289.12,7362.469083,0.02271
...,...,...,...,...,...,...
2025-04-15,84590.36,86491.40,83592.77,83629.78,6460.941442,0.04323
2025-04-16,83622.52,85526.40,83088.02,84028.72,8243.059013,0.04279
2025-04-17,84028.71,85494.94,83711.69,84961.97,5862.464055,0.04333
2025-04-18,84961.97,85150.94,84287.06,84466.47,1884.039112,0.04333


Parameters:
- Lookback_period = 20 (days)
- body_multiplier = 1.5
- back_candles = 50
- test_candles = 10

# Detect FVG

In [16]:
def detect_fvg(data, lookback_period=10, body_multiplier=1.5):
    """
    Detects Fair Value Gaps (FVGs) in historical price data.

    Parameters:
        data (DataFrame): DataFrame with columns ['open', 'high', 'low', 'close'].
        lookback_period (int): Number of candles to look back for average body size.
        body_multiplier (float): Multiplier to determine significant body size.

    Returns:
        list of tuples: Each tuple contains ('type', start, end, index).
    """
    fvg_list = [None, None]

    for i in range(2, len(data)):
        first_high = data['High'].iloc[i-2]
        first_low = data['Low'].iloc[i-2]
        middle_open = data['Open'].iloc[i-1]
        middle_close = data['Close'].iloc[i-1]
        third_low = data['Low'].iloc[i]
        third_high = data['High'].iloc[i]

        # Calculate the average absolute body size over the lookback period
        prev_bodies = (data['Close'].iloc[max(0, i-1-lookback_period):i-1] - 
                       data['Open'].iloc[max(0, i-1-lookback_period):i-1]).abs()
        avg_body_size = prev_bodies.mean()
        
        # Ensure avg_body_size is nonzero to avoid false positives
        avg_body_size = avg_body_size if avg_body_size > 0 else 0.001

        middle_body = abs(middle_close - middle_open)

        # Check for Bullish FVG
        if third_low > first_high and middle_body > avg_body_size * body_multiplier:
            fvg_list.append(('bullish', first_high, third_low, i))

        # Check for Bearish FVG
        elif third_high < first_low and middle_body > avg_body_size * body_multiplier:
            fvg_list.append(('bearish', first_low, third_high, i))
        else:
            fvg_list.append(None)

    return fvg_list



In [17]:
# need to play around with parameters
btc['FVG'] = detect_fvg(btc, lookback_period=10, body_multiplier=1.2)
print(len(btc[~btc['FVG'].isna()]))
btc

611


Unnamed: 0_level_0,Open,High,Low,Close,Volume,Risk Free Rate,FVG
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2015-07-20,277.98,280.00,277.37,280.00,782.883420,0.02372,
2015-07-21,279.96,281.27,276.85,277.32,4943.559434,0.02340,
2015-07-22,277.33,278.54,275.01,277.89,4687.909383,0.02322,
2015-07-23,277.96,279.75,276.28,277.39,5306.919575,0.02277,
2015-07-24,277.23,291.52,276.43,289.12,7362.469083,0.02271,
...,...,...,...,...,...,...,...
2025-04-15,84590.36,86491.40,83592.77,83629.78,6460.941442,0.04323,
2025-04-16,83622.52,85526.40,83088.02,84028.72,8243.059013,0.04279,
2025-04-17,84028.71,85494.94,83711.69,84961.97,5862.464055,0.04333,
2025-04-18,84961.97,85150.94,84287.06,84466.47,1884.039112,0.04333,


# Visualise FVG

In [18]:
import plotly.graph_objects as go
from datetime import datetime

dfpl = btc[-100:]
# Create the figure
fig = go.Figure()

# Add candlestick chart
fig.add_trace(go.Candlestick(
    x=dfpl.index,
    open=dfpl["Open"],
    high=dfpl["High"],
    low=dfpl["Low"],
    close=dfpl["Close"],
    name="Candles"
))

# Add FVG zones
for _, row in dfpl.iterrows():
    if isinstance(row["FVG"], tuple):
        fvg_type, start, end, index = row["FVG"]
        color = "rgba(0,255,0,0.3)" if fvg_type == "bullish" else "rgba(255,0,0,0.3)"
        current_date = row.name
        time_delta = (dfpl.index[1] - dfpl.index[0]) * 2
        forward_delta = (dfpl.index[1] - dfpl.index[0]) * 50
        fig.add_shape(
            type="rect",
            x0=current_date - time_delta,
            x1=current_date + forward_delta,
            y0=start,
            y1=end,
            fillcolor=color,
            opacity=0.8,
            layer="below",
            line=dict(width=0),
        )

# Show the chart
fig.update_layout(width=1200, height=800,
                  xaxis=dict(showgrid=False),
                  yaxis=dict(showgrid=False),
                  plot_bgcolor='black',
                  paper_bgcolor='black')
fig.show()

Interpretation:
- Candlestick Chart: The main display shows Daily Bitcoin price movements with candlesticks - green candles represent price increases, while red candles show price decreases.
- Price Range: The y-axis shows the Bitcoin price.
- Colored Rectangles: Fair Value Gaps (FVGs)
    - Green rectangles: Bullish FVGs (where price moved up quickly, creating a gap)
    - Red rectangles: Bearish FVGs (where price moved down quickly, creating a gap)


Intuition:
- Green rectangles (Bullish FVGs): These indicate potential buying opportunities, especially if price returns to test this level from above AND the previous candle breaks above a key resistance level.
- Red rectangles (Bearish FVGs): These indicate potential selling opportunities in, especially if price returns to test this level from below AND the previous candle breaks below a key support level.


Note:
- FVG on its own is not enough. We need something else to reinforce these claims

# Key Levels

In [19]:
def detect_key_levels(df, current_candle, backcandles=50, test_candles=10):
    """
    Detects key support and resistance levels in a given backcandles window.
    
    A level is identified if a candle's high is the highest or its low is the lowest 
    compared to `test_candles` before and after it.

    Parameters:
        df (pd.DataFrame): DataFrame containing 'High' and 'Low' columns.
        current_candle (int): The index of the current candle (latest available candle).
        backcandles (int): Number of candles to look back.
        test_candles (int): Number of candles before and after each candle to check.

    Returns:
        dict: A dictionary with detected 'support' and 'resistance' levels.
    """
    key_levels = {"support": [], "resistance": []}

    # Define the last candle that can be tested to avoid lookahead bias
    last_testable_candle = current_candle - test_candles

    # Ensure we have enough data
    if last_testable_candle < backcandles + test_candles:
        return key_levels  # Not enough historical data

    # Iterate through the backcandles window
    for i in range(current_candle - backcandles, last_testable_candle):
        high = df['High'].iloc[i]
        low = df['Low'].iloc[i]

        # Get surrounding window of test_candles before and after
        before = df.iloc[max(0, i - test_candles):i]
        after = df.iloc[i + 1: min(len(df), i + test_candles + 1)]

        # Check if current high is the highest among before & after candles
        if high > before['High'].max() and high > after['High'].max():
            key_levels["resistance"].append((i, high))

        # Check if current low is the lowest among before & after candles
        if low < before['Low'].min() and low < after['Low'].min():
            key_levels["support"].append((i, low))

    return key_levels

def fill_key_levels(df, backcandles=50, test_candles=10):
    """
    Adds a 'key_levels' column to the DataFrame where each row contains all
    key support and resistance levels detected up to that candle (including
    both the level value and the index of the candle that generated it).
    
    Parameters:
        df (pd.DataFrame): DataFrame containing 'High' and 'Low' columns.
        backcandles (int): Lookback window for detecting key levels.
        test_candles (int): Number of candles before/after for validation.

    Returns:
        pd.DataFrame: Updated DataFrame with the new 'key_levels' column.
    """
    df["key_levels"] = None  # Initialize the column
    
    from tqdm import tqdm
    for current_candle in tqdm(range(backcandles + test_candles, len(df))):
        # Detect key levels for the current candle
        key_levels = detect_key_levels(df, current_candle, backcandles, test_candles)

        # Collect support and resistance levels (with their indices) up to current_candle
        support_levels = [(idx, level) for (idx, level) in key_levels["support"] 
                          if idx < current_candle]
        resistance_levels = [(idx, level) for (idx, level) in key_levels["resistance"] 
                             if idx < current_candle]

        # Store the levels along with the originating candle index
        if support_levels or resistance_levels:
            df.at[current_candle, "key_levels"] = {
                "support": support_levels,
                "resistance": resistance_levels
            }
            
    return df

btc = fill_key_levels(btc.reset_index(), backcandles=50, test_candles=15)
btc[~btc['FVG'].isna()]

100%|██████████| 3497/3497 [00:07<00:00, 478.67it/s]


Unnamed: 0,Date,Open,High,Low,Close,Volume,Risk Free Rate,FVG,key_levels
5,2015-07-25,289.12,291.67,286.82,289.70,4102.452960,0.02271,"(bullish, 279.75, 286.82, 5)",
10,2015-07-30,290.26,291.56,286.56,288.49,5138.981549,0.02268,"(bearish, 293.65, 291.56, 10)",
13,2015-08-02,281.53,282.62,277.33,282.62,3361.086689,0.02205,"(bearish, 282.79, 282.62, 13)",
20,2015-08-09,262.02,268.80,260.97,267.63,5046.976875,0.02175,"(bearish, 277.05, 268.8, 20)",
30,2015-08-19,221.99,238.99,214.91,227.34,13329.690336,0.02129,"(bearish, 255.58, 238.99, 30)",
...,...,...,...,...,...,...,...,...,...
3535,2025-03-24,86092.95,88804.64,85533.06,87523.62,13107.503916,0.04331,"(bullish, 84517.75, 85533.06, 3535)","{'support': [(3486, 91178.01)], 'resistance': []}"
3540,2025-03-29,84381.81,84584.13,81608.99,82616.83,3194.243037,0.04255,"(bearish, 87108.87, 84584.13, 3540)","{'support': [(3522, 76555.0)], 'resistance': []}"
3541,2025-03-30,82624.06,83508.98,81538.88,82379.98,2472.723022,0.04255,"(bearish, 83533.4, 83508.98, 3541)","{'support': [(3522, 76555.0)], 'resistance': []}"
3549,2025-04-07,78370.15,81223.67,74420.69,79140.01,26706.529308,0.04155,"(bearish, 82346.61, 81223.67, 3549)","{'support': [(3522, 76555.0)], 'resistance': []}"


# Visualize Key Levels

In [20]:

def plot_fvg_and_key_levels(df, start_idx, end_idx, extension=30):
    """
    Plots candlesticks, FVG zones, and key levels (support/resistance) for a
    subset of a DataFrame from `start_idx` to `end_idx`.
    
    The FVG column is expected to have tuples of the form:
        (fvg_type, start_price, end_price, trigger_index)

    The key_levels column is expected to have dictionaries of the form:
        {
          "support": [(idx, price), (idx, price), ...],
          "resistance": [(idx, price), (idx, price), ...]
        }

    Parameters:
    -----------
    df : pd.DataFrame
        Must contain: "Open", "High", "Low", "Close", "FVG", "key_levels".
    start_idx : int
        Starting row index for plotting.
    end_idx : int
        Ending row index for plotting.
    extension : int
        How far (in x-axis units/index steps) to extend the FVG rectangles
        and key-level lines.
    
    Returns:
    --------
    fig : plotly.graph_objects.Figure
        A Plotly Figure with the candlesticks, FVG, and key-level lines.
    """
    
    # Slice the DataFrame to the desired plotting range
    dfpl = df.loc[start_idx:end_idx]

    # Create the figure
    fig = go.Figure()

    # -- 1) Add Candlestick Chart --
    fig.add_trace(go.Candlestick(
        x=dfpl.index,
        open=dfpl["Open"],
        high=dfpl["High"],
        low=dfpl["Low"],
        close=dfpl["Close"],
        name="Candles"
    ))

    # -- 2) Add FVG Zones --
    for i, row in dfpl.iterrows():
        # Check if "FVG" is a valid tuple: (fvg_type, start_price, end_price, trigger_index)
        if isinstance(row.get("FVG"), tuple):
            fvg_type, start_price, end_price, trigger_idx = row["FVG"]

            # Choose a fill color based on bullish vs. bearish
            if fvg_type == "bullish":
                color = "rgba(0, 255, 0, 0.3)"   # greenish
            else:
                color = "rgba(255, 0, 0, 0.3)"   # reddish

            fig.add_shape(
                type="rect",
                x0=trigger_idx, 
                x1=trigger_idx + extension,
                y0=start_price,
                y1=end_price,
                fillcolor=color,
                opacity=0.4,
                layer="below",
                line=dict(width=0),
            )

    # -- 3) Add Key Levels as Horizontal Lines --
    for i, row in dfpl.iterrows():
        key_levels = row.get("key_levels", None)
        if key_levels:
            # key_levels is a dict: {"support": [(idx, val), ...], "resistance": [(idx, val), ...]}
            support_levels = key_levels.get("support", [])
            resistance_levels = key_levels.get("resistance", [])

            # Plot support levels
            for (gen_idx, s_price) in support_levels:
                # We only draw the line if gen_idx is in (start_idx, end_idx)
                # You can decide to relax/omit this check if you want lines from outside the window.
                if start_idx <= gen_idx <= end_idx:
                    fig.add_shape(
                        type="line",
                        x0=gen_idx,
                        x1=gen_idx + extension,
                        y0=s_price,
                        y1=s_price,
                        line=dict(color="blue", width=2),
                        layer="below"
                    )

            # Plot resistance levels
            for (gen_idx, r_price) in resistance_levels:
                if start_idx <= gen_idx <= end_idx:
                    fig.add_shape(
                        type="line",
                        x0=gen_idx,
                        x1=gen_idx + extension,
                        y0=r_price,
                        y1=r_price,
                        line=dict(color="orange", width=2),
                        layer="below"
                    )

    # -- 4) Figure Aesthetics --
    fig.update_layout(
        width=1200,
        height=800,
        xaxis=dict(showgrid=False),
        yaxis=dict(showgrid=False),
        plot_bgcolor='black',
        paper_bgcolor='black'
    )
    return fig

fig = plot_fvg_and_key_levels(btc, start_idx=len(btc)-201, end_idx=len(btc)-1, extension=50)
fig.show()

# Detect the Signal

In [25]:
def detect_break_signal(df):
    """
    Detects if the current candle carries an FVG signal and,
    at the same time, the previous candle has crossed a key level
    in the expected direction (up for bullish, down for bearish).

    - If FVG is bullish and previous candle crosses ABOVE a level -> signal = 2
    - If FVG is bearish and previous candle crosses BELOW a level -> signal = 1
    - Otherwise -> signal = 0

    The 'FVG' column is expected to have tuples like:
        (fvg_type, lower_price, upper_price, trigger_index)
      where fvg_type is either "bullish" or "bearish".

    The 'key_levels' column is expected to be a dictionary with:
        {
            'support': [(level_candle_idx, level_price), ...],
            'resistance': [(level_candle_idx, level_price), ...]
        }
    """

    # Initialize the new signal column to 0
    df["break_signal"] = 0

    # We start at 1 because we compare candle i with its previous candle (i-1)
    for i in range(1, len(df)):
        fvg = df.loc[i, "FVG"]
        key_levels = df.loc[i, "key_levels"]

        # We only proceed if there's an FVG tuple and some key_levels dict
        if isinstance(fvg, tuple) and isinstance(key_levels, dict):
            fvg_type = fvg[0]  # "bullish" or "bearish"

            # Previous candle's OHLC
            prev_open = df.loc[i-1, "Open"]
            prev_close = df.loc[i-1, "Close"]

            # -----------------------
            # 1) Bullish FVG check
            # -----------------------
            if fvg_type == "bullish":
                # Typically you'd check crossing a "resistance" level
                # crossing means the previous candle goes from below -> above
                resistance_levels = key_levels.get("resistance", [])
                
                for (lvl_idx, lvl_price) in resistance_levels:
                    # Condition: previously below, ended above
                    # simplest check is: prev_open < lvl_price < prev_close
                    if prev_open < lvl_price and prev_close > lvl_price:
                        df.loc[i, "break_signal"] = 1
                        break  # No need to check more levels

            # -----------------------
            # 2) Bearish FVG check
            # -----------------------
            elif fvg_type == "bearish":
                # Typically you'd check crossing a "support" level
                support_levels = key_levels.get("support", [])
                
                for (lvl_idx, lvl_price) in support_levels:
                    # Condition: previously above, ended below
                    # simplest check is: prev_open > lvl_price and prev_close < lvl_price
                    if prev_open > lvl_price and prev_close < lvl_price:
                        df.loc[i, "break_signal"] = -1
                        break  # No need to check more levels

    return df

btc = detect_break_signal(btc)
btc[btc["break_signal"]!=0]
# btc.iloc[2303:2305]

Unnamed: 0,Date,Open,High,Low,Close,Volume,Risk Free Rate,FVG,key_levels,break_signal,pointpos,buy_pct,sell_pct
86,2015-10-14,250.98,255.0,250.25,253.27,7587.526674,0.01981,"(bullish, 249.14, 250.25, 86)","{'support': [(36, 198.02), (64, 224.45)], 'res...",1,255.0001,0.0,0.201782
271,2016-04-16,430.82,434.89,429.89,433.39,4133.985987,0.01752,"(bullish, 428.0, 429.89, 271)","{'support': [(229, 381.09)], 'resistance': [(2...",1,434.8901,0.0,0.201766
314,2016-05-29,523.35,549.99,491.01,525.22,9957.527243,0.01851,"(bullish, 476.75, 491.01, 314)","{'support': [], 'resistance': [(280, 472.38)]}",1,549.9901,0.0,0.211964
379,2016-08-02,604.87,613.38,483.0,537.47,14839.34616,0.01537,"(bearish, 621.37, 613.38, 379)","{'support': [(339, 544.13), (355, 608.0)], 're...",-1,,0.0,0.0
380,2016-08-03,538.8,576.0,519.3,573.36,20292.859717,0.01542,"(bearish, 599.0, 576.0, 380)","{'support': [(339, 544.13), (355, 608.0)], 're...",-1,,0.0,0.0
450,2016-10-12,642.3,643.58,631.91,635.79,4334.738703,0.01778,"(bullish, 619.2, 631.91, 450)","{'support': [(409, 568.12)], 'resistance': [(4...",1,643.5801,0.0,0.208211
461,2016-10-23,653.03,655.8,645.33,651.04,3075.739567,0.0174,"(bullish, 633.41, 645.33, 461)","{'support': [], 'resistance': [(416, 631.3)]}",1,655.8001,0.0,0.207528
727,2017-07-16,1964.33,2043.94,1758.2,1911.78,21740.89274,0.02319,"(bearish, 2150.0, 2043.94, 727)","{'support': [(696, 2050.0)], 'resistance': [(6...",-1,,0.0,0.0
748,2017-08-06,3243.5,3278.42,3150.0,3222.22,8666.200782,0.02267,"(bullish, 2877.79, 3150.0, 748)","{'support': [(727, 1758.2)], 'resistance': [(7...",1,3278.4201,0.0,0.237836
816,2017-10-13,5440.0,5867.0,5390.0,5624.8,26597.975259,0.0228,"(bullish, 4869.97, 5390.0, 816)","{'support': [(788, 2975.01)], 'resistance': [(...",1,5867.0001,0.0,0.242713


# Plot the Point position of the signal

In [27]:
def pointpos(x):
    if x['break_signal']==-1:
        return x['Low']-1e-4
    elif x['break_signal']==1:
        return x['High']+1e-4
    else:
        return np.nan

btc['pointpos'] = btc.apply(lambda row: pointpos(row), axis=1)


strt = 2230
end = 2274
fig = plot_fvg_and_key_levels(btc, start_idx=strt, end_idx=end, extension=5)
fig.add_scatter(x=btc.index[strt:end], y=btc['pointpos'][strt:end], mode="markers",
                marker=dict(size=8, color="MediumPurple"),
                name="pivot")
fig.show()

# Position Sizes

In [23]:
def calculate_position_percentages(df):
    """
    Add columns for buy_pct and sell_pct based on FVG signals and characteristics.
    
    Parameters:
    df (DataFrame): DataFrame with 'break_signal' and 'FVG' columns
    
    Returns:
    DataFrame: DataFrame with added 'buy_pct' and 'sell_pct' columns
    """
    # Initialize columns with zeros
    df['buy_pct'] = 0.0
    df['sell_pct'] = 0.0
    
    # Loop through each row to calculate position percentages
    for i in df.index:
        signal = df.loc[i, 'break_signal']
        fvg = df.loc[i, 'FVG']
        
        base_pct = 0.2  # Base percentage (20%)
        
        if signal == 2:  # Buy signal
            if isinstance(fvg, tuple):
                # Extract FVG information
                fvg_type, start, end, idx = fvg
                
                # Calculate FVG size as percentage
                fvg_size = abs(end - start) / start
                
                # Scale position size based on FVG size
                # Larger FVG = stronger signal = larger position
                adjusted_pct = base_pct * (1 + 2 * fvg_size)  # Adjust scaling factor as needed
                
                # Cap the position size
                adjusted_pct = min(adjusted_pct, 0.5)  # Maximum 50% of cash
                
                df.loc[i, 'buy_pct'] = adjusted_pct
            else:
                df.loc[i, 'buy_pct'] = base_pct
                
        elif signal == 1:  # Sell signal
            if isinstance(fvg, tuple):
                # Extract FVG information
                fvg_type, start, end, idx = fvg
                
                # Calculate FVG size as percentage
                fvg_size = abs(end - start) / start
                
                # Scale position size based on FVG size
                adjusted_pct = base_pct * (1 + 2 * fvg_size)
                
                # Cap the position size
                adjusted_pct = min(adjusted_pct, 0.5)  # Maximum 50% of holdings
                
                df.loc[i, 'sell_pct'] = adjusted_pct
            else:
                df.loc[i, 'sell_pct'] = base_pct
    
    return df

btc = calculate_position_percentages(btc)
btc.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Risk Free Rate,FVG,key_levels,break_signal,pointpos,buy_pct,sell_pct
0,2015-07-20,277.98,280.0,277.37,280.0,782.88342,0.02372,,,0,,0.0,0.0
1,2015-07-21,279.96,281.27,276.85,277.32,4943.559434,0.0234,,,0,,0.0,0.0
2,2015-07-22,277.33,278.54,275.01,277.89,4687.909383,0.02322,,,0,,0.0,0.0
3,2015-07-23,277.96,279.75,276.28,277.39,5306.919575,0.02277,,,0,,0.0,0.0
4,2015-07-24,277.23,291.52,276.43,289.12,7362.469083,0.02271,,,0,,0.0,0.0
