In [417]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots

class StrategyBacktest:
    def __init__(self, dataset_path, func=None, start_date=None):
        self.df = pd.read_csv(dataset_path, index_col=0, parse_dates=True)
        self.df = self.df.asfreq('D', method='ffill')
        self.func = func
        self.weights_history = []

        if start_date: self.df = self.df.loc[start_date:]

    def get_weight(self, df):
        if self.func:
            weight = np.array(self.func(df))
            #self.weights_history.append(weight)
            return weight
        else:
            raise NotImplementedError("Weight function not provided")

    def calculate_returns(self, tickers=None, rebalance=False, n=1):
        if tickers:
            df = self.df[tickers]
        else:
            df = self.df

        pct_df = df.pct_change()
        account = 100
        self.weights_history = []
        weights = self.get_weight(df.iloc[:n])
        balance = [(np.array(weights) * account).tolist()]
        self.weights_history.append(weights.copy())

        portfolio_values = [sum(balance[-1])]

        for i in range(n+1, len(df)):

            new_balance = np.array(balance[-1]) * (np.array(pct_df.iloc[i].values)+1)
            balance.append(new_balance.tolist())
            portfolio_values.append(sum(balance[-1]))
            account = portfolio_values[-1]

            weights = np.array(balance[-1]) / account
            self.weights_history.append(weights)

            if rebalance and self.is_month_end(df.index[i]):
                weights = self.get_weight(df.iloc[:i,:])  # 리밸런싱을 통해 가중치 업데이트
                new_balance = (np.array(weights) * account).tolist()
                balance.append(new_balance)
                portfolio_values[-1] = sum(balance[-1])
                account = portfolio_values[-1]
                self.weights_history[-1] = weights.copy()  # 가중치 이력 업데이트

        self.weights_df = pd.DataFrame(self.weights_history, index=df.index[n:], columns=tickers if tickers else df.columns)
        return pd.DataFrame(portfolio_values, index=df.index[n:], columns=['Portfolio'])

    def run(self, tickers=None, rebalance=False, func=None, benchmark='SPY', title=None, n=1):
        if func:
            self.func = func
        self.results = self.calculate_returns(tickers, rebalance, n)
        self.plot_results(benchmark, title)

        port = pd.DataFrame(self.calculate_performance_metrics(self.results), index=['My Portfolio'])
        df = self.df[benchmark]
        bench = pd.DataFrame(df.loc[self.results.index[0]:self.results.index[-1]])
        bench = pd.DataFrame(self.calculate_performance_metrics(bench), index=['Benchmark'])

        solution = pd.concat([port, bench], axis=0)
        return solution
        
    def is_month_end(self, date):
        return pd.to_datetime(date).is_month_end

    def get_benchmark(self, benchmark, result):
        df = self.df[benchmark]
        bench_returns = (df[result.index[0]:result.index[-1]].pct_change() + 1).cumprod()
        return bench_returns

    def plot_results(self, benchmark, title=None):
        fig = make_subplots(rows=3, cols=1, 
                            subplot_titles=("Portfolio Performance", "Drawdown", "Portfolio Weights"), 
                            vertical_spacing=0.1)

        bench_returns = self.get_benchmark(benchmark, self.results)
        bench_returns = pd.DataFrame(bench_returns, columns=[benchmark])

        cum_returns = (self.results.pct_change() + 1).cumprod()
        fig.add_trace(go.Scatter(x=cum_returns.index, y=cum_returns['Portfolio'], name='Cumulative Returns', line=dict(width=3)), row=1, col=1)
        fig.add_trace(go.Scatter(x=bench_returns.index, y=bench_returns[benchmark], name='Benchmark', line=dict(width=1)), row=1, col=1)

        rolling_max = cum_returns.cummax()
        bench_rolling_max = bench_returns.cummax()
        drawdowns = (cum_returns - rolling_max) / rolling_max
        bench_drawdowns = (bench_returns - bench_rolling_max) / bench_rolling_max
        fig.add_trace(go.Scatter(x=drawdowns.index, y=drawdowns['Portfolio'], name='My_Drawdown', line=dict(color='orange', width=3)), row=2, col=1)
        fig.add_trace(go.Scatter(x=bench_drawdowns.index, y=bench_drawdowns[benchmark], name='Bench_Drawdown', line=dict(color='red', width=1)), row=2, col=1)

        for i in range(self.weights_df.shape[1]):
            fig.add_trace(go.Scatter(x=self.weights_df.index, y=self.weights_df.iloc[:, i],
                                    stackgroup='one',
                                    name=self.weights_df.columns[i]), row=3, col=1)

        fig.update_yaxes(title_text="Cumulative Returns", row=1, col=1)
        fig.update_yaxes(title_text="Drawdown", row=2, col=1)
        fig.update_yaxes(title_text="Weights", row=3, col=1)

        if title!=None: name = title
        else: name = 'Strategy Performance'
        fig.update_layout(height=900, title_text=name, showlegend=True)
        fig.show()

    def calculate_performance_metrics(self, df):
        cumulative_returns = np.array(df / df.iloc[0] - 1)
        num_years = len(df) / 365
        cagr = (df.iloc[-1] / df.iloc[0]) ** (1 / num_years) - 1
        vol = df.pct_change().std() * np.sqrt(365)
        mdd = (df / df.cummax() - 1).min()
        sharpe_ratio = (cagr / vol)

        return {'cumulative_returns': cumulative_returns[-1] * 100, 'cagr': float(cagr) * 100, 'vol': float(vol) * 100, 'mdd': float(mdd) * 100, 'sharpe_ratio': float(sharpe_ratio)}


In [421]:
# 정적전략

def sixty_forty(df):
    # SPY IEF
    return [0.6, 0.4]

def coffeehouse(df):
    # SPY IVE IWM IWN EFA VNQ AGG
    return [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.4]

def permanet(df):
    # SPY TLT GLD BIL
    return [0.25, 0.25, 0.25, 0.25]

def goldenbutterfly(df):
    # SPY IWN GLD SHY TLT
    return [0.2, 0.2, 0.2, 0.2, 0.2]

def ivy(df):
    # SPY EFA AGG DBC VNQ
    return [0.2, 0.2, 0.2, 0.2, 0.2]

def allseasons(df):
    # SPY TLT IEF GLD DBC
    return [0.3, 0.4, 0.15, 0.075, 0.075]

In [420]:
sf = StrategyBacktest('dataset.csv', sixty_forty, start_date='2020-01-01')
sf.run(tickers=['SPY', 'IEF'], rebalance=False, title='60/40 Portfolio', n=1)

Unnamed: 0,cumulative_returns,cagr,vol,mdd,sharpe_ratio
My Portfolio,33.399776,7.244101,13.515429,-21.986968,0.535987
Benchmark,61.962421,12.414399,22.355007,-33.717257,0.55533
