# RESEARCH NOTEBOOK --> XGridT


In [None]:
# This is necessary to recognize the modules
import os
import sys
import warnings

warnings.filterwarnings("ignore")

root_path = os.path.abspath(os.path.join(os.getcwd(), "../.."))
sys.path.append(root_path)

In [None]:
import pandas as pd
import pandas_ta as ta  # noqa: F401

from core.data_sources import CLOBDataSource

# Initialize the data source
clob = CLOBDataSource()

In [None]:
# Define the parameters
exchange = "binance_perpetual"
trading_pair = "1000PEPE-USDT"
interval = "15m"
days = 60

In [None]:
candles = await clob.get_candles_last_days(connector_name=exchange, trading_pair=trading_pair, interval=interval, days=days)

In [None]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from core.features.candles.peak_analyzer import PeakAnalyzer

ema_short = 12
ema_medium = 47
ema_long = 100
donchian_channel_length = 50
natr_length = 100
natr_multiplier = 2.0
tp_default = 0.05
filter_limits_by_signal = False

analyzer = PeakAnalyzer(candles.data)
peaks = analyzer.get_peaks(prominence_percentage=0.1, distance=100)

In [None]:
import numpy as np
from hummingbot.core.data_type.common import TradeType

df = candles.data.copy()
high_peaks = peaks["high_peaks"]
low_peaks = peaks["low_peaks"]
df.loc[high_peaks[0], "high_peaks"] = high_peaks[1]
df.loc[low_peaks[0], "low_peaks"] = low_peaks[1]
df["high_peaks"].ffill(inplace=True)
df["low_peaks"].ffill(inplace=True)
df.ta.ema(length=ema_short, append=True)
df.ta.ema(length=ema_medium, append=True)
df.ta.ema(length=ema_long, append=True)
df.ta.donchian(lower_length=donchian_channel_length, upper_length=donchian_channel_length, append=True)
df.ta.natr(length=natr_length, append=True)

short_ema = df[f"EMA_{ema_short}"]
medium_ema = df[f"EMA_{ema_medium}"]
long_ema = df[f"EMA_{ema_long}"]
close = df["close"]
donchian_upper = df[f"DCU_{donchian_channel_length}_{donchian_channel_length}"]
donchian_lower = df[f"DCL_{donchian_channel_length}_{donchian_channel_length}"]
natr = df[f"NATR_{natr_length}"] / 100

# Compute limit upper and lower adding the Natr to the Donchian upper and lower
limit_upper = donchian_upper * (1 + natr * natr_multiplier)
limit_lower = donchian_lower * (1 - natr * natr_multiplier)


long_condition = (short_ema > medium_ema) & (short_ema > long_ema)
short_condition = (short_ema < medium_ema) & (short_ema < long_ema)

df["signal"] = 0
df.loc[long_condition, "signal"] = 1
df.loc[short_condition, "signal"] = -1


fig = candles.fig(width=1200)

# add individual peaks with yellow color without background and higher size
fig.add_trace(
    go.Scatter(x=high_peaks[0], y=high_peaks[1], mode="markers", marker=dict(color="yellow", size=4), name="High Peaks")
)
fig.add_trace(go.Scatter(x=low_peaks[0], y=low_peaks[1], mode="markers", marker=dict(color="yellow", size=4), name="Low Peaks"))


def get_unbounded_tp(row, tp_default, side, high_peaks, low_peaks, criteria="latest"):
    timestamp = row.name
    close = row["close"]
    if side == TradeType.BUY:
        previous_peaks_higher_than_price = [
            price_peak
            for price_timestamp, price_peak in zip(high_peaks[0], high_peaks[1])
            if price_timestamp < timestamp and price_peak > close
        ]
        if previous_peaks_higher_than_price:
            if criteria == "latest":
                return previous_peaks_higher_than_price[-1]
            elif criteria == "closest":
                return min(previous_peaks_higher_than_price, key=lambda x: abs(x - row["close"]))
        else:
            return close * (1 + tp_default)
    else:
        previous_peaks_lower_than_price = [
            price_peak
            for price_timestamp, price_peak in zip(low_peaks[0], low_peaks[1])
            if price_timestamp < timestamp and price_peak < close
        ]
        if previous_peaks_lower_than_price:
            if criteria == "latest":
                return previous_peaks_lower_than_price[-1]
            elif criteria == "closest":
                return min(previous_peaks_lower_than_price, key=lambda x: abs(x - row["close"]))
        else:
            return close * (1 - tp_default)


# Apply the function to create the TP_LONG column
df["SL_LONG"] = donchian_lower
df["SL_SHORT"] = donchian_upper
df["TP_LONG"] = df.apply(
    lambda x: x.high_peaks
    if pd.notna(x.high_peaks) and x.high_peaks > x.high
    else get_unbounded_tp(x, tp_default, TradeType.BUY, high_peaks, low_peaks),
    axis=1,
)
df["TP_SHORT"] = df.apply(
    lambda x: x.low_peaks
    if pd.notna(x.low_peaks) and x.low_peaks < x.low
    else get_unbounded_tp(x, tp_default, TradeType.SELL, high_peaks, low_peaks),
    axis=1,
)
df["LIMIT_LONG"] = limit_lower
df["LIMIT_SHORT"] = limit_upper


if filter_limits_by_signal:
    # If the signal is 1, remove the value of TP SHORT, SL_SHORT and LIMIT_SHORT
    df.loc[df["signal"] == 1, "TP_SHORT"] = np.nan
    df.loc[df["signal"] == 1, "SL_SHORT"] = np.nan
    df.loc[df["signal"] == 1, "LIMIT_SHORT"] = np.nan
    # If the signal is -1, remove the value of TP LONG, SL_LONG and LIMIT_LONG
    df.loc[df["signal"] == -1, "TP_LONG"] = np.nan
    df.loc[df["signal"] == -1, "SL_LONG"] = np.nan
    df.loc[df["signal"] == -1, "LIMIT_LONG"] = np.nan
    # if signal is 0, remove the value of TP_LONG, TP_SHORT, SL_LONG, SL_SHORT, LIMIT_LONG and LIMIT_SHORT
    df.loc[df["signal"] == 0, "TP_LONG"] = np.nan
    df.loc[df["signal"] == 0, "TP_SHORT"] = np.nan
    df.loc[df["signal"] == 0, "SL_LONG"] = np.nan
    df.loc[df["signal"] == 0, "SL_SHORT"] = np.nan
    df.loc[df["signal"] == 0, "LIMIT_LONG"] = np.nan
    df.loc[df["signal"] == 0, "LIMIT_SHORT"] = np.nan


fig.add_trace(go.Scatter(x=df.index, y=short_ema, line=dict(color="white", width=2), name="Fast EMA"))
fig.add_trace(go.Scatter(x=df.index, y=medium_ema, line=dict(color="blue", width=2), name="Medium EMA"))
fig.add_trace(go.Scatter(x=df.index, y=long_ema, line=dict(color="violet", width=2), name="Slow EMA"))

# Add TP LONG and TP SHORT
fig.add_trace(go.Scatter(x=df.index, y=df["TP_LONG"], line=dict(color="green", width=2), name="TP LONG"))
fig.add_trace(go.Scatter(x=df.index, y=df["TP_SHORT"], line=dict(color="green", width=2), name="TP SHORT"))


fig.add_trace(go.Scatter(x=df.index, y=df["SL_LONG"], line=dict(color="red", width=2), name="SL LONG"))
fig.add_trace(go.Scatter(x=df.index, y=df["SL_SHORT"], line=dict(color="red", width=2), name="SL SHORT"))
# add limit upper and lower with white dashed line
fig.add_trace(go.Scatter(x=df.index, y=df["LIMIT_LONG"], line=dict(color="white", width=2, dash="dash"), name="LIMIT LONG"))
fig.add_trace(go.Scatter(x=df.index, y=df["LIMIT_SHORT"], line=dict(color="white", width=2, dash="dash"), name="LIMIT SHORT"))
fig.show()

In [None]:
df

In [None]:
import plotly.graph_objects as go

# Add candlestick
fig = candles.fig(width=1200)

# Add EMAs
ema_fast = f"EMA_{ema_short}"
ema_med = f"EMA_{ema_medium}"
ema_slow = f"EMA_{ema_long}"

fig.add_trace(go.Scatter(x=candles_df.index, y=candles_df[ema_fast], line=dict(color="#00FF00", width=2), name="Fast EMA"))
fig.add_trace(go.Scatter(x=candles_df.index, y=candles_df[ema_med], line=dict(color="#FFA500", width=2), name="Medium EMA"))
fig.add_trace(go.Scatter(x=candles_df.index, y=candles_df[ema_slow], line=dict(color="#0000FF", width=2), name="Slow EMA"))

# Add support and resistance TP
fig.add_trace(go.Scatter(x=candles_df.index, y=candles_df["support_tp"], line=dict(color="#00FF00", width=2), name="Support TP"))
fig.add_trace(
    go.Scatter(x=candles_df.index, y=candles_df["resistance_tp"], line=dict(color="#00FF00", width=2), name="Resistance TP")
)

# Add support and resistance SL
fig.add_trace(go.Scatter(x=candles_df.index, y=candles_df["support_sl"], line=dict(color="#FF0000", width=2), name="Support SL"))
fig.add_trace(
    go.Scatter(x=candles_df.index, y=candles_df["resistance_sl"], line=dict(color="#FF0000", width=2), name="Resistance SL")
)


# Show the plot
fig.show()

In [None]:
# Generate signal


short_ema = candles_df[f"EMA_{ema_short}"]
medium_ema = candles_df[f"EMA_{ema_medium}"]
long_ema = candles_df[f"EMA_{ema_long}"]
close = candles_df["close"]


long_condition = (short_ema > medium_ema) & (medium_ema > long_ema) & (short_ema > long_ema)
short_condition = (short_ema < medium_ema) & (medium_ema < long_ema) & (short_ema < long_ema)

candles_df["signal"] = 0
candles_df.loc[long_condition, "signal"] = 1
candles_df.loc[short_condition, "signal"] = -1

In [None]:
import plotly.graph_objects as go

fig = make_subplots(
    rows=3,
    cols=1,
    shared_xaxes=True,
    vertical_spacing=0.02,
    subplot_titles=("OHLC with BB", "MACD", "Signal"),
    row_heights=[0.6, 0.2, 0.2],
)

# Add candlestick
fig.add_trace(
    go.Candlestick(
        x=candles_df.index,
        open=candles_df["open"],
        high=candles_df["high"],
        low=candles_df["low"],
        close=candles_df["close"],
        name="Candlesticks",
    ),
    row=1,
    col=1,
)


# Add EMAs
ema_fast = f"EMA_{ema_short}"
ema_med = f"EMA_{ema_medium}"
ema_slow = f"EMA_{ema_long}"

fig.add_trace(
    go.Scatter(x=candles_df.index, y=candles_df[ema_fast], line=dict(color="#00FF00", width=2), name="Fast EMA"), row=1, col=1
)
fig.add_trace(
    go.Scatter(x=candles_df.index, y=candles_df[ema_med], line=dict(color="#FFA500", width=2), name="Medium EMA"), row=1, col=1
)
fig.add_trace(
    go.Scatter(x=candles_df.index, y=candles_df[ema_slow], line=dict(color="#0000FF", width=2), name="Slow EMA"), row=1, col=1
)


# Add the signal line
fig.add_trace(
    go.Scatter(x=candles_df.index, y=candles_df["signal"], mode="lines", name="Signal", line=dict(color="white")), row=3, col=1
)

# Update layout for dark theme
fig.update_layout(
    title=f"{exchange} - {trading_pair} - {interval}",
    width=1500,
    height=1000,
    font=dict(color="#e1e1e1"),
    plot_bgcolor="#1e1e1e",
    paper_bgcolor="#1e1e1e",
    xaxis_rangeslider_visible=False,
    legend=dict(bgcolor="rgba(0,0,0,0)"),
    yaxis=dict(title="Price"),
    yaxis2=dict(title="MACD", showgrid=False),
    yaxis3=dict(title="Signal", showgrid=False),
    showlegend=False,
)

# Update axes
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor="#323232", zeroline=False)
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor="#323232", zeroline=False)

# Show the plot
fig.show()

# CONCLUSION

In this notebook, we have implemented a strategy combining the MACD (Moving Average Convergence Divergence) indicator with Bollinger Bands. We've visualized these indicators along with the price data and generated signals based on their interactions. This approach provides a solid foundation for our trading strategy.
 
## Key components of our strategy include:
 1. MACD for trend identification
 2. Bollinger Bands for volatility measurement and potential reversal points
 3. A signal line derived from the combination of these indicators
 
 The next step is to backtest this strategy to evaluate its profitability and robustness. For this purpose, we have created a controller file named `macd_bb.py` in this folder. This file implements the logic we've developed here, allowing us to conduct comprehensive backtests in the subsequent notebook.