In [14]:
import trendline
import crypto_data
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots

In [54]:
symbol = 'APT/USDT'
data = crypto_data.download_crypto_ohlc_data(symbols=[symbol])[symbol]

Initializing data download for: bybit, Timeframe: 1d, Periods: 365...
Data for APT/USDT:
  Start: 2024-01-16 00:00:00
  End: 2025-01-14 00:00:00
Last row: 
Timestamp    2025-01-14 00:00:00
Open                       8.477
High                       9.112
Low                         8.41
Close                      9.059
Volume                 763875.05
Name: 364, dtype: object

  Expected Interval: 1d, Actual Interval: 1 days 00:00:00
Finished downloading data for 1 symbols


In [79]:
def trendline_breakout_hl(data: pd.DataFrame, lookback: int, ema_period: int = 21):
    """
    Calculate trendline breakouts with specific pattern conditions:
    Long: price > EMA21 + negative resistance slope
    Short: price < EMA21 + positive resistance slope
    """
    # Calculate EMA
    data['EMA21'] = data['Close'].ewm(span=ema_period, adjust=False).mean()
    
    # Initialize arrays
    n = len(data)
    s_tl = np.full(n, np.nan)
    r_tl = np.full(n, np.nan)
    signals = np.zeros(n)
    r_slopes = np.full(n, np.nan)
    s_slopes = np.full(n, np.nan)
    
    for i in range(lookback, n):
        try:
            # Get window data
            window_close = data.iloc[i - lookback:i]['Close'].to_numpy()
            window_high = data.iloc[i - lookback:i]['High'].to_numpy()
            window_low = data.iloc[i - lookback:i]['Low'].to_numpy()
            
            # Calculate trendlines
            s_coefs, r_coefs = trendline.fit_trendlines_high_low(
                window_high, window_low, window_close
            )
            
            # Store slopes
            s_slopes[i] = s_coefs[0]
            r_slopes[i] = r_coefs[0]
            
            # Project trendlines to current bar
            s_val = s_coefs[1] + lookback * s_coefs[0]
            r_val = r_coefs[1] + lookback * r_coefs[0]
            
            s_tl[i] = s_val
            r_tl[i] = r_val
            
            # Get current price and EMA
            current_close = data.iloc[i]['Close']
            current_ema = data.iloc[i]['EMA21']

            # Average price condition
            window_average = window_close.mean()
            
            # Long condition: price > EMA + negative resistance slope
            if (window_average > current_ema) and (r_coefs[0] < 0) and (current_close > r_val):
                signals[i] = 1.0
                
            # Short condition: price < EMA + positive resistance slope    
            elif (window_average < current_ema) and (r_coefs[0] > 0) and (current_close < s_val):
                signals[i] = -1.0
                
            else:
                signals[i] = 0
                
        except Exception as e:
            signals[i] = signals[i - 1]
            continue
    
    return s_tl, r_tl, signals, data['EMA21'].to_numpy(), r_slopes, s_slopes

# Usage
s_tl, r_tl, signals, ema, r_slopes, s_slopes = trendline_breakout_hl(data, lookback=14)

In [82]:
def plot_channel_with_breakout(data, open_prices, high_prices, low_prices, close_prices, lookback: int = 7):
    # Calculate trendlines using your original algorithm
    s_tl, r_tl, signals, ema, r_slopes, s_slopes = trendline_breakout_hl(data, lookback)
    
    # Create the main figure
    fig = go.Figure()
    
    # Add candlestick chart
    fig.add_trace(
        go.Candlestick(
            x=np.arange(len(close_prices)),
            open=open_prices,
            high=high_prices,
            low=low_prices,
            close=close_prices,
            increasing_line_color='#26A69A',  # Green candles
            decreasing_line_color='#EF5350',  # Red candles
            name='Price'
        )
    )
    
    # Add support trendline
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(s_tl)),
            y=s_tl,
            mode='lines',
            name='Support',
            line=dict(color='white', width=1, dash='dot'),
            showlegend=True
        )
    )
    
    # Add resistance trendline
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(r_tl)),
            y=r_tl,
            mode='lines',
            name='Resistance',
            line=dict(color='white', width=1, dash='dot'),
            showlegend=True
        )
    )
    
    # Add EMA
    fig.add_trace(
        go.Scatter(
            x=np.arange(len(ema)),
            y=ema,
            mode='lines',
            name='EMA',
            showlegend=True
        )
    )
    
    # Find long and short signals directly
    long_idx = np.where(signals == 1)[0]
    long_values = close_prices[long_idx]
    
    short_idx = np.where(signals == -1)[0]
    short_values = close_prices[short_idx]
    
    # Add markers for long signals (up arrows)
    if len(long_idx) > 0:
        fig.add_trace(
            go.Scatter(
                x=long_idx,
                y=long_values,
                mode='markers',
                name='Long Signals',
                marker=dict(
                    size=12,
                    symbol='triangle-up',
                    color='lime',
                    line=dict(color='black', width=2)
                ),
                showlegend=True
            )
        )
    
    # Add markers for short signals (down arrows)
    if len(short_idx) > 0:
        fig.add_trace(
            go.Scatter(
                x=short_idx,
                y=short_values,
                mode='markers',
                name='Short Signals',
                marker=dict(
                    size=12,
                    symbol='triangle-down',
                    color='red',
                    line=dict(color='black', width=2)
                ),
                showlegend=True
            )
        )
    
    # Update layout
    fig.update_layout(
        title='Trendline Channel with Breakouts',
        xaxis_rangeslider_visible=False,
        height=800,
        yaxis=dict(
            gridcolor='rgba(128,128,128,0.1)',
            zerolinecolor='rgba(128,128,128,0.1)',
        ),
        xaxis=dict(
            gridcolor='rgba(128,128,128,0.1)',
            zerolinecolor='rgba(128,128,128,0.1)',
        ),
        showlegend=True,
        legend=dict(
            bgcolor='rgba(0,0,0,0.5)',
            bordercolor='rgba(255,255,255,0.2)'
        )
    )
    
    return fig

# Usage remains the same
fig = plot_channel_with_breakout(data,
    open_prices=data['Open'],
    high_prices=data['High'],
    low_prices=data['Low'],
    close_prices=data['Close']
)

fig

In [91]:
# Main loop test

long_tickers = []
short_tickers = []

#data = crypto_data.download_crypto_ohlc_data()

for ticker in data.columns.levels[0]:
    ticker_data = data[ticker].dropna(subset=['Close'])

    s_tl, r_tl, signals, ema, r_slopes, s_slopes = trendline_breakout_hl(ticker_data, 7)
    
    if signals[-1] == 1:
        long_tickers.append(ticker)
    elif signals[-1] == -1:
        short_tickers.append(ticker)

print(f"\nLong tickers: {long_tickers}\n Short tickers: {short_tickers}\n")    


Long tickers: ['COOK/USDT:USDT', 'DEXE/USDT:USDT', 'GRIFFAIN/USDT:USDT', 'SPX/USDT:USDT']
 Short tickers: []

