In [None]:
import pandas as pd
import numpy as np
import yfinance as yf
import warnings
import plotly.graph_objects as go

warnings.filterwarnings('ignore')

def calculate_atr(df, period=14):
    high_low = df['High'] - df['Low']
    high_close = np.abs(df['High'] - df['Close'].shift(1))
    low_close = np.abs(df['Low'] - df['Close'].shift(1))
    tr = np.maximum.reduce([high_low, high_close, low_close])
    tr = pd.Series(tr, index=df.index)
    atr = tr.rolling(window=period).mean()
    df['ATR'] = atr
    return df

def calculate_aroon(df, period=14):
    df['Aroon Up'] = 100 * df['High'].rolling(window=period).apply(lambda x: (x.idxmax() - x.index[0]).days / period, raw=False)
    df['Aroon Down'] = 100 * df['Low'].rolling(window=period).apply(lambda x: (x.idxmin() - x.index[0]).days / period, raw=False)
    return df

def chaikin_volatility(df, period=14):
    df['ADL'] = ((2 * df['Close'] - df['High'] - df['Low']) / (df['High'] - df['Low'])) * df['Volume']
    df['ADL'] = df['ADL'].cumsum()

    df['CV'] = df['ADL'].ewm(span=13, adjust=False).mean() - df['ADL'].rolling(window=period).mean()
    return df

def calculate_cv_spike(df, period=14, threshold_percentile=98):
    df = chaikin_volatility(df, period)
    rolling_percentile = df['CV'].rolling(window=period).apply(
        lambda x: np.percentile(x, threshold_percentile), raw=False)

    df['CV Spike'] = df['CV'] > rolling_percentile
    return df


def calculate_dpo(df, period=14):
    sma_values = df['Close'].rolling(window=period).mean()
    dpo_values = df['Close'] - sma_values
    df['DPO'] = dpo_values
    return df

def calculate_psar(df, af=0.02, max_af=0.2):
    df['PSAR'] = np.nan
    df['Trend'] = np.nan

    psar = df['Low'][0]
    ep = df['High'][0]
    af = af
    is_uptrend = True

    psar_values = []
    trend = []

    for i in range(len(df)):
        psar = psar + af * (ep - psar)
        if is_uptrend:
            if df['High'][i] > ep:
                ep = df['High'][i]
                af = min(af + 0.02, max_af)
            if df['Low'][i] < psar:
                is_uptrend = False
                psar = ep
                ep = df['Low'][i]
                af = 0.02
        else:
            if df['Low'][i] < ep:
                ep = df['Low'][i]
                af = min(af + 0.02, max_af)
            if df['High'][i] > psar:
                is_uptrend = True
                psar = ep
                ep = df['High'][i]
                af = 0.02

        psar_values.append(psar)
        trend.append(1 if is_uptrend else -1)

    df['PSAR'] = psar_values
    df['Trend'] = trend
    return df

In [None]:
!pip install plotly




In [None]:
def dynamic_atr_multiplier(df, atr_period=14, volatility_lookback=50):
    recent_atr = df['ATR'].tail(volatility_lookback)
    atr_mean = recent_atr.mean()
    atr_std = recent_atr.std()
    current_atr = df['ATR'].iloc[-1]

    volatility_z_score = (current_atr - atr_mean) / atr_std if atr_std != 0 else 0

    if volatility_z_score > 2:
        return 8.0
    elif volatility_z_score > 1:
        return 6.0
    elif volatility_z_score < -1:
        return 3.0
    else:
        return 5.0

In [None]:
def chaikin_long_short_strategy(df, dpo_threshold=-10):
    df = calculate_psar(df)
    df = calculate_cv_spike(df)
    df = calculate_atr(df)
    df = calculate_dpo(df)
    df= calculate_aroon(df)
    signals = []
    current_position = None
    entry_price = None

    taken = 0
    for i in range(len(df)):
        dynamic_mult= dynamic_atr_multiplier(df)
        taken = 0
        if pd.isna(df['CV Spike'][i]) or pd.isna(df['PSAR'][i]) or pd.isna(df['DPO'][i]):
            signals.append(0)
            continue

        # Entry Conditions

        # DPO is used to identify bearish signals only - strong downtrends only entering short

        short_conditions = (
            # 1. DPO crosses below zero
            (df['DPO'].shift(1) >=dpo_threshold) &
            (df['DPO'] < dpo_threshold)
        ).any()

        short_momentum =(
            # 2. Negative trend change in DPO
            (df['DPO'].shift(2) < df['DPO'].shift(1)) &
            (df['DPO'].shift(1) >= df['DPO'])
        ).any()

        if df['CV Spike'][i] == 1 and current_position is None:
            if df['Trend'][i] == 1 and df['PSAR'][i] < df['Close'][i]:
                signals.append(2)
                entry_price = df['Close'][i]
                taken = 1
                current_position='Long'
            else:
              if (df['Trend'][i] == -1 and df['PSAR'][i] > df['Close'][i] and short_conditions and short_momentum) or \
               (df['Close'][i] < df['Low'].shift(1).rolling(window=5).min().iloc[i] and df['Aroon Down'][i] > 50):
                signals.append(1)
                entry_price = df['Close'][i]
                taken = 1
                current_position='Short'

        # Exit Conditions
        if entry_price is not None:
            if current_position == 'Long':
                if (df['PSAR'][i] > df['Close'][i]) or (df['Close'][i] < entry_price - df['ATR'][i] * dynamic_mult):
                  if taken:
                    signals[-1] = -2
                  else:
                      signals.append(-2)
                  entry_price = None
                  current_position = None
                elif not taken:
                  signals.append(0)

            elif current_position == 'Short':
                if (df['PSAR'][i] < df['Close'][i]) or (df['Close'][i] > entry_price + df['ATR'][i] * dynamic_mult / 2) or \
                  (df['Close'][i] > df['High'].shift(1).rolling(window=5).max().iloc[i]):
                    if taken:
                      signals[-1] = -1
                    else:
                      signals.append(-1)
                    entry_price = None
                    current_position = None
                elif not taken:
                  signals.append(0)

        else:
            signals.append(0)

    return signals

In [None]:
def prepare_data(symbol):
    data = yf.download(symbol, start='2020-01-01', end='2022-12-31', interval='5d')
    data.columns = data.columns.droplevel(1)
    return data

eth_data = prepare_data('ETH-USD')

[*********************100%***********************]  1 of 1 completed


In [None]:
signals=chaikin_long_short_strategy(eth_data)

In [None]:
def calculate_performance_metrics(df, signals, initial_capital=10000):
    """
    Calculate comprehensive trading strategy performance metrics
    """
    if len(signals) != len(df):
        raise ValueError("Signals length must match DataFrame length")

    df['Signal'] = signals

    trades = []
    current_trade = None

    portfolio_values = [initial_capital]
    current_capital = initial_capital

    peak_capital = initial_capital
    max_drawdown = 0
    max_adverse_excursion = 0

    for i in range(len(df)):
        if df['Signal'].iloc[i] == 2:
            if current_trade is None:
                current_trade = {
                    'type': 'long',
                    'entry_price': df['Close'].iloc[i],
                    'entry_date': df.index[i],
                    'entry_signal_index': i
                }

        elif df['Signal'].iloc[i] == 1:
            if current_trade is None:
                current_trade = {
                    'type': 'short',
                    'entry_price': df['Close'].iloc[i],
                    'entry_date': df.index[i],
                    'entry_signal_index': i
                }

        elif df['Signal'].iloc[i] == -2 and current_trade and current_trade['type'] == 'long':
            exit_price = df['Close'].iloc[i]
            trade_return = (exit_price - current_trade['entry_price']) / current_trade['entry_price']

            # Received capital
            trade_capital = current_capital * (1 + trade_return)

            current_trade['exit_price'] = exit_price
            current_trade['exit_date'] = df.index[i]
            current_trade['return'] = trade_return
            trades.append(current_trade)

            # Update current capital and reset trade
            current_capital = trade_capital
            current_trade = None

        elif df['Signal'].iloc[i] == -1 and current_trade and current_trade['type'] == 'short':
            exit_price = df['Close'].iloc[i]
            trade_return = (current_trade['entry_price'] - exit_price) / current_trade['entry_price']

            # Received capital
            trade_capital = current_capital * (1 + trade_return)

            current_trade['exit_price'] = exit_price
            current_trade['exit_date'] = df.index[i]
            current_trade['return'] = trade_return
            trades.append(current_trade)

            current_capital = trade_capital
            current_trade = None

        portfolio_values.append(current_capital)

        if current_capital > peak_capital:
            peak_capital = current_capital

        current_drawdown = (peak_capital - current_capital) / peak_capital
        max_drawdown = max(max_drawdown, current_drawdown)

    trades_df = pd.DataFrame(trades)

    metrics = {
        'Total Trades': len(trades),
        'Win Rate (%)': calculate_win_rate(trades_df),
        'Net Profit (%) (Compounded)': calculate_net_profit(initial_capital, current_capital),
        'Sharpe Ratio': calculate_sharpe_ratio(np.diff(portfolio_values)),
        'Sortino Ratio': calculate_sortino_ratio(np.diff(portfolio_values)),
        'Risk-Reward Ratio': calculate_risk_reward_ratio(trades_df),
        'Max Drawdown (%)': max_drawdown * 100,
        'Max Adverse Excursion (%)': calculate_max_adverse_excursion(trades_df),
        'Max Trade Duration (days)': calculate_max_trade_duration(trades_df),
        'Average Trade Duration (days)': calculate_avg_trade_duration(trades_df),
    }

    return metrics

def calculate_win_rate(trades_df):
    wins = trades_df[trades_df['return'] > 0].shape[0]
    return (wins / len(trades_df)) * 100 if len(trades_df) > 0 else 0

def calculate_net_profit(initial_capital, final_capital):
    return (final_capital - initial_capital) / initial_capital * 100

def calculate_sharpe_ratio(daily_returns):
    mean_return = np.mean(daily_returns)
    std_dev = np.std(daily_returns)
    return mean_return / std_dev if std_dev != 0 else 0

def calculate_sortino_ratio(daily_returns):
    downside_returns = [r for r in daily_returns if r < 0]
    downside_deviation = np.std(downside_returns)
    mean_return = np.mean(daily_returns)
    return mean_return / downside_deviation if downside_deviation != 0 else 0

def calculate_risk_reward_ratio(trades_df):
    avg_win = np.mean(trades_df[trades_df['return'] > 0]['return'])
    avg_loss = np.mean(trades_df[trades_df['return'] < 0]['return'])
    return avg_win / abs(avg_loss) if avg_loss != 0 else 0

def calculate_max_adverse_excursion(trades_df):
    return np.min(trades_df['return']) if len(trades_df) > 0 else 0

def calculate_max_trade_duration(trades_df):
    return (trades_df['exit_date'] - trades_df['entry_date']).max().days if len(trades_df) > 0 else 0

def calculate_avg_trade_duration(trades_df):
    return (trades_df['exit_date'] - trades_df['entry_date']).mean().days if len(trades_df) > 0 else 0


In [None]:
print(len(signals))
print(len(eth_data))

219
219


In [None]:
def plot_heikin_ashi_with_signals(eth_data, signals):
    heikin_ashi = eth_data.copy()
    heikin_ashi['HA-Close'] = (eth_data['Open'] + eth_data['High'] + eth_data['Low'] + eth_data['Close']) / 4
    heikin_ashi['HA-Open'] = (eth_data['Open'] + eth_data['Close']) / 2
    for i in range(1, len(heikin_ashi)):
        heikin_ashi['HA-Open'].iloc[i] = (heikin_ashi['HA-Open'].iloc[i - 1] + heikin_ashi['HA-Close'].iloc[i - 1]) / 2
    heikin_ashi['HA-High'] = heikin_ashi[['High', 'HA-Open', 'HA-Close']].max(axis=1)
    heikin_ashi['HA-Low'] = heikin_ashi[['Low', 'HA-Open', 'HA-Close']].min(axis=1)

    signal_dates = heikin_ashi.index[:len(signals)]

    long_entry_signals = [i for i, signal in enumerate(signals) if signal == 2]
    long_exit_signals = [i for i, signal in enumerate(signals) if signal == -2]
    short_signals = [i for i, signal in enumerate(signals) if signal == 1]
    short_exit_signals = [i for i, signal in enumerate(signals) if signal == -1]

    fig = go.Figure(data=[
        go.Candlestick(
            x=heikin_ashi.index,
            open=heikin_ashi['HA-Open'],
            high=heikin_ashi['HA-High'],
            low=heikin_ashi['HA-Low'],
            close=heikin_ashi['HA-Close'],
            increasing_line_color='green',
            decreasing_line_color='red'
        )
    ])

    fig.add_trace(go.Scatter(
        x=heikin_ashi.index[long_entry_signals],
        y=heikin_ashi['HA-Close'].iloc[long_entry_signals],
        mode='markers',
        marker=dict(color='darkblue', size=12, symbol='triangle-up'),
        name='Long Entry Signal'
    ))

    fig.add_trace(go.Scatter(
        x=heikin_ashi.index[long_exit_signals],
        y=heikin_ashi['HA-Close'].iloc[long_exit_signals],
        mode='markers',
        marker=dict(color='darkred', size=12, symbol='triangle-down'),
        name='Long Exit Signal'
    ))

    fig.add_trace(go.Scatter(
        x=heikin_ashi.index[short_signals],
        y=heikin_ashi['HA-Close'].iloc[short_signals],
        mode='markers',
        marker=dict(color='darkorange', size=12, symbol='triangle-left'),
        name='Short Signal'
    ))

    fig.add_trace(go.Scatter(
        x=heikin_ashi.index[short_exit_signals],
        y=heikin_ashi['HA-Close'].iloc[short_exit_signals],
        mode='markers',
        marker=dict(color='purple', size=12, symbol='triangle-right'),
        name='Short Exit Signal'
    ))

    fig.update_layout(
        title='Ethereum (ETH-USD) Heikin-Ashi Candlestick Chart with Trade Signals',
        xaxis_title='Date',
        yaxis_title='Price (USD)',
        xaxis_rangeslider_visible=False
    )

    fig.show()


In [None]:
plot_heikin_ashi_with_signals(eth_data,signals)

In [None]:
metrics=calculate_performance_metrics(eth_data,signals)
print(metrics)

{'Total Trades': 8, 'Win Rate (%)': 75.0, 'Net Profit (%) (Compounded)': 246.3750445806789, 'Sharpe Ratio': 0.14984647729987302, 'Sortino Ratio': 0.8850510656059303, 'Risk-Reward Ratio': 4.62318331684873, 'Max Drawdown (%)': 11.088923305254463, 'Max Adverse Excursion (%)': -0.0636311832500195, 'Max Trade Duration (days)': 80, 'Average Trade Duration (days)': 50}
