#Harsh Pandey

In [40]:
import pandas as pd
import numpy as np
from sklearn.model_selection import ParameterGrid
import plotly.graph_objects as go
import seaborn as sns
import matplotlib.pyplot as plt
from plotly.subplots import make_subplots
import plotly.io as pio
pio.renderers.default = 'notebook'
import warnings
warnings.filterwarnings("ignore", message="Boolean Series key will be reindexed to match DataFrame index")


In [None]:
data = pd.read_csv('eth_usdt_2021_2023.csv')
data['date'] = pd.to_datetime(data['time'])
data.set_index('date', inplace=True)

# Filter data for the specified date range
data = data[(data.index >= '2021-01-01') & (data.index <= '2023-12-30')]
data= data.drop(columns=['conversionType','conversionSymbol'])

In [88]:
class Strategy_Macd:
    def __init__(self, data):
        self.data = data

    def calculate_macd(self, ema_short_period, ema_long_period, signal_period):
        self.data['ema_short'] = self.data['close'].ewm(span=ema_short_period, adjust=False).mean()
        self.data['ema_long'] = self.data['close'].ewm(span=ema_long_period, adjust=False).mean()
        self.data['macd'] = self.data['ema_short'] - self.data['ema_long']
        self.data['signal'] = self.data['macd'].ewm(span=signal_period, adjust=False).mean()
        return self.data

    def apply_trading_strategy(self, trailing_stop_loss_pct, transaction_cost_pct):
        self.data['long_signal'] = np.where((self.data['macd'] > self.data['signal']) & (self.data['macd'].shift(1) <= self.data['signal'].shift(1)), 1, 0)
        self.data['short_signal'] = np.where((self.data['macd'] < self.data['signal']) & (self.data['macd'].shift(1) >= self.data['signal'].shift(1)), 1, 0)
        self.data['sell_signal'] = np.where((self.data['close'] < self.data['ema_short']) | (self.data['macd'] < self.data['signal']), 1, 0)

        self.data['position'] = 0
        position = 0
        highest_price = 0
        self.trade_days = []  # List to store tuples of buy and sell dates

        buy_date = None
        for i in range(1, len(self.data)):
            if self.data['long_signal'].iloc[i] == 1 and position == 0:
                position = 1
                highest_price = self.data['close'].iloc[i]
                self.data.loc[self.data.index[i], 'position'] = position
                self.data.loc[self.data.index[i], 'transaction_cost'] = transaction_cost_pct * self.data['close'].iloc[i]
                buy_date = self.data.index[i]  # Store buy date
            elif position == 1:
                highest_price = max(highest_price, self.data['close'].iloc[i])
                trailing_stop_loss_price = highest_price * (1 - trailing_stop_loss_pct)
                if self.data['close'].iloc[i] <= trailing_stop_loss_price or self.data['sell_signal'].iloc[i] == 1:
                    position = 0
                    self.data.loc[self.data.index[i], 'transaction_cost'] = transaction_cost_pct * self.data['close'].iloc[i]
                    sell_date = self.data.index[i]  # Store sell date
                    if buy_date:
                        self.trade_days.append((buy_date, sell_date))  # Add tuple of buy and sell date
                        buy_date = None
                self.data.loc[self.data.index[i], 'position'] = position

        if self.data['position'].iloc[-1] != 0:
            self.data.loc[self.data.index[-1], 'position'] = 0
            self.data.loc[self.data.index[-1], 'transaction_cost'] = transaction_cost_pct * self.data['close'].iloc[-1]
            sell_date = self.data.index[-1]  # Add last sell date
            if buy_date:
                self.trade_days.append((buy_date, sell_date))  # Add tuple of buy and sell date

        self.data['transaction_cost'] = self.data['transaction_cost'].fillna(0)
        self.data['strategy_returns'] = (self.data['close'].pct_change() - transaction_cost_pct) * self.data['position'].shift(1)

        return self.data, self.trade_days

    def best_parameters(self):
        param_grid = {
            'ema_short_period': [7,8, 10, 12, 14],
            'ema_long_period': [25,26, 28, 30, 32],
            'signal_period': [6,7, 9, 10, 11],
            'trailing_stop_loss_pct': [0.05, 0.06, 0.07, 0.08,0.09]
        }

        best_sharpe_ratio = -np.inf
        best_params = None
        results = []
        transaction_cost_pct = 0.001

        for params in ParameterGrid(param_grid):
            data_copy = self.data.copy()
            strategy = Strategy_Macd(data_copy)
            data_copy = strategy.calculate_macd(params['ema_short_period'], params['ema_long_period'], params['signal_period'])
            data_copy, _ = strategy.apply_trading_strategy(params['trailing_stop_loss_pct'], transaction_cost_pct)
            performance = strategy.calculate_performance()
            sharpe_ratio = performance['Sharpe Ratio']
            results.append({**params, 'Sharpe Ratio': sharpe_ratio})

            if sharpe_ratio > best_sharpe_ratio:
                best_sharpe_ratio = sharpe_ratio
                best_params = params

        return best_params


    def calculate_performance(self, initial_capital=1000):
        self.data['capital'] = initial_capital * (1 + self.data['strategy_returns']).cumprod()
        total_returns = self.data['capital'].iloc[-1]
        profit = total_returns - initial_capital
        gross_profit=0
        gross_loss=0
        # Calculate performance metrics
        closed_trades = self.data[self.data['position'].diff().abs() == 1]

        position_change_indices = closed_trades[closed_trades['position'].diff() == -1]
        buy_indices = closed_trades[self.data['position'].diff() == 1]
        # Initialize lists to store winning and losing trades
        winning_indices = []
        losing_indices = []
        for idx_pos in position_change_indices.index:
            # Get the integer index position
            idx_pos_int = position_change_indices.index.get_loc(idx_pos)

            # Ensure idx_pos_int is greater than 0 to avoid index out of bounds error
            if idx_pos_int >= 0:
                # Get the current and previous capital values
                current_capital = position_change_indices.iloc[idx_pos_int]['capital']
                previous_capital = buy_indices.iloc[idx_pos_int]['capital']

                # Check if the trade is winning or losing
                if current_capital > previous_capital:
                    winning_indices.append(idx_pos)
                    gross_profit+=current_capital - previous_capital
                else:
                    losing_indices.append(idx_pos)
                    gross_loss+=current_capital - previous_capital


        # Select the rows corresponding to winning and losing trades
        win_trades = position_change_indices.loc[winning_indices]
        lose_trades = position_change_indices.loc[losing_indices]
        self.win_trades=win_trades
        self.lose_trades=lose_trades
        win_rate = len(win_trades) / len(position_change_indices) if len(position_change_indices) > 0 else 0
        max_drawdown = (self.data['capital'].cummax() - self.data['capital']).max()

        # Calculate gross profit and loss
        avg_winning_trade = gross_profit / len(win_trades) if len(win_trades) > 0 else 0
        avg_losing_trade = gross_loss / len(lose_trades) if len(lose_trades) > 0 else 0
        largest_winning_trade = (position_change_indices['capital']-position_change_indices['capital'].shift(1)).max() if len(win_trades) > 0 else 0
        largest_losing_trade = lose_trades['strategy_returns'].min() * initial_capital if len(lose_trades) > 0 else 0
        buy_and_hold_return = self.data['close'].iloc[-1] / self.data['close'].iloc[0] - 1

        # Calculate average holding duration
        if len(closed_trades) % 2 == 0:
            avg_holding_duration = (closed_trades.index[1::2] - closed_trades.index[::2]).mean()
        else:
            avg_holding_duration = (closed_trades.index[1::2] - closed_trades.index[:-1:2]).mean()

        risk_free_rate = 0.01
        daily_return = self.data['strategy_returns'].mean()
        daily_volatility = self.data['strategy_returns'].std()
        sharpe_ratio = (daily_return - risk_free_rate) / daily_volatility * np.sqrt(252)
        downside_volatility = self.data[self.data['strategy_returns'] < 0]['strategy_returns'].std()
        sortino_ratio = (daily_return - risk_free_rate) / downside_volatility * np.sqrt(252)
        annualized_returns = ((total_returns/initial_capital)**(1/(len(self.data) / 365))) - 1

        performance = {
            "Annualized Returns": annualized_returns,
            "total_returns": total_returns,
            "Net Profit": profit,
            "win_rate": win_rate,
            "max_drawdown": max_drawdown,
            "gross_profit": gross_profit,
            "gross_loss": gross_loss,
            "avg_winning_trade": avg_winning_trade,
            "avg_losing_trade": avg_losing_trade,
            "largest_winning_trade": largest_winning_trade,
            "largest_losing_trade": largest_losing_trade,
            "buy_and_hold_return": buy_and_hold_return,
            "avg_holding_duration": avg_holding_duration,
            "Sharpe Ratio": sharpe_ratio,
            "sortino_ratio": sortino_ratio
        }

        return performance
    def plot_trades(self):
        fig = make_subplots(rows=2, cols=1, shared_xaxes=True, 
                            subplot_titles=('Optimized MACD Strategy Backtest', 'Capital Over Time'),
                            vertical_spacing=0.1)

        fig.add_trace(go.Scatter(x=self.data.index, y=self.data['close'], mode='lines', name='Close Price'), row=1, col=1)
        fig.add_trace(go.Scatter(x=self.data.index, y=self.data['macd'], mode='lines', name='MACD'), row=1, col=1)
        fig.add_trace(go.Scatter(x=self.data.index, y=self.data['signal'], mode='lines', name='Signal', line=dict(dash='dot')),row=1, col=1)
 

        fig.add_trace(go.Scatter(x=self.data.index, y=self.data['ema_short'], mode='lines', name='EMA short', line=dict(dash='dot')), row=1, col=1)
        fig.add_trace(go.Scatter(x=self.data.index, y=self.data['ema_long'], mode='lines', name='EMA long', line=dict(dash='dot')), row=1, col=1)

        buy_signals = [trade[0] for trade in self.trade_days]
        sell_signals = [trade[1] for trade in self.trade_days]

        fig.add_trace(go.Scatter(x=buy_signals, y=self.data.loc[buy_signals, 'close'], mode='markers', name='Buy Signal',
                                 marker=dict(color='green', symbol='triangle-up', size=10)), row=1, col=1)
        fig.add_trace(go.Scatter(x=sell_signals, y=self.data.loc[sell_signals, 'close'], mode='markers', name='Sell Signal',
                                 marker=dict(color='red', symbol='triangle-down', size=10)), row=1, col=1)

        self.data['capital'] = 1000 * (1 + self.data['strategy_returns']).cumprod()
        fig.add_trace(go.Scatter(x=self.data.index, y=self.data['capital'], mode='lines', name='Capital'), row=2, col=1)

        fig.update_layout(template='plotly_dark', title_text='Optimized Trading Strategy Backtest', height=800)
        fig.show()

    def calculate_win_loss_ratio(self):
        wins = 0
        losses = 0

        for buy_date, sell_date in self.trade_days:
            if self.data.loc[sell_date, 'close'] > self.data.loc[buy_date, 'close']:
                wins += 1
            else:
                losses += 1

        win_loss_ratio = wins / losses if losses > 0 else float('inf')
        return win_loss_ratio
    def analysis(self):
        trailing_stop_loss_pct = self.best_parameters()['trailing_stop_loss_pct']
        ema_long_period=  self.best_parameters()['ema_long_period']
        ema_short_period=  self.best_parameters()['ema_short_period']
        signal_period =self.best_parameters()['signal_period']
        transaction_cost_pct = 0.001
        self.data=self.calculate_macd(ema_short_period,ema_long_period,signal_period)

        self.result, self.trade_days = self.apply_trading_strategy(trailing_stop_loss_pct, transaction_cost_pct)

        # Plot the trades
        self.plot_trades()

        # Calculate and print win/loss ratio
        win_loss_ratio = self.calculate_win_loss_ratio()
        print(f"Win/Loss Ratio: {win_loss_ratio:.2f}")
    def print_trades(self):
        print("Trade Days (Buy, Sell):")
        for trade in self.trade_days:
            print(trade)

In [90]:
processing= Strategy_Macd(data)
for i in processing.best_parameters():
    print(i," : ",processing.best_parameters()[i])

ema_long_period  :  30
ema_short_period  :  14
signal_period  :  6
trailing_stop_loss_pct  :  0.09


In [91]:
processing.analysis()

Win/Loss Ratio: 1.10


In [92]:
for i in processing.calculate_performance():
    print(i," : ",processing.calculate_performance()[i])

Annualized Returns  :  0.736234036397216
total_returns  :  5225.987132652385
Net Profit  :  4225.987132652385
win_rate  :  0.5227272727272727
max_drawdown  :  1405.390934654477
gross_profit  :  7488.598188027806
gross_loss  :  -3262.6110553754215
avg_winning_trade  :  325.5912255664263
avg_losing_trade  :  -155.3624312083534
largest_winning_trade  :  1197.4254724911261
largest_losing_trade  :  -126.81558694245932
buy_and_hold_return  :  2.1434802886066224
avg_holding_duration  :  7 days 15:16:21.818181818
Sharpe Ratio  :  -5.858182565113412
sortino_ratio  :  -5.790890272167039
