In [1]:
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt

In [2]:
ticker = 'TQQQ'
start_date='2015-04-01'
df=yf.download(ticker, start=start_date)

YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed


## 양방향 MAC Event_Driven 백테스트 with 수수료

In [20]:
def mac_long_short(df, sw, lw, verbose=True):
    fee_rate = 0.001
    short_window = sw # 단기
    long_window = lw # 장기
    data = df.copy()
    
    #지수가중이동평균선 계산
    data['Short_MA'] = data[('Close', ticker)].ewm(span=short_window, adjust=False).mean()
    data['Long_MA'] = data[('Close', ticker)].ewm(span=long_window, adjust=False).mean()
    data=data[long_window:].copy() #초기데이터 제거
    
    
    #포지션과 시그널 계산
    data['Position']=np.where(data['Short_MA']>data['Long_MA'], 1, -1) 
    data['Signal']=data['Position'].diff().fillna(0)
    
    prices=data[('Close', ticker)].values
    signals=data['Signal'].values
    
    cash_init = 1000000
    cash = cash_init
    asset=np.zeros(len(data))
    asset[0]=cash
    pos = 0
    
    for i in range(1, len(data)):
        if pos == 0: #start 지점
            if signals[i] == 1: # 골든 크로스 -> 롱 진입
                pos = 1
                long_entry_price=prices[i]
                long_num = int(cash/(long_entry_price*(1+fee_rate)))
                cash-=long_entry_price*long_num*(1+fee_rate)

            elif signals[i] == -1: # 데드 크로스 -> 숏 진입
                pos = -1
                short_entry_price = prices[i]
                short_num = -int(cash/(short_entry_price*(1+fee_rate))) # 공매도 수량 short_num: 음수
                cash -= short_entry_price*(-short_num)*fee_rate # 공매도는 빌려서 파는 것이므로, 주가*num만큼 계좌 차감이 아니라 거래 수수료만 차감 됨.

        elif pos == 1: # Long 보유 중
            if signals[i] == -2: # 데드 크로스 -> 롱 청산, 숏 진입
                pos = -1
                cash += prices[i]*long_num*(1-fee_rate)

                short_entry_price = prices[i]
                short_num = -int(cash/(short_entry_price*(1+fee_rate)))
                cash -= short_entry_price*(-short_num)*fee_rate
        
        elif pos== -1: # Short 보유 중
            if signals[i] == 2: # 골든 크로스 -> 숏 청산, 롱 진입
                pos = 1
                cash += (prices[i]-short_entry_price)*short_num + prices[i]*short_num*fee_rate # 숏으로 인한 손실 + 수수료 
                # cash += prices[i]*short_num*(1 + fee_rate) - short_entry_price*short_num*
                
                long_entry_price = prices[i]
                long_num = int(cash/(long_entry_price*(1+fee_rate)))
                cash -= long_entry_price*long_num*(1+fee_rate)
    
        # 자산 가치 갱신 기록
        if pos == 0:
            asset[i]=cash
        elif pos == 1:
            asset[i]=cash+prices[i]*long_num
        elif pos == -1: ##
            asset[i] = cash + (prices[i]-short_entry_price)*short_num

    data['Long_Entry'] = np.where(data['Signal'] == 2, data[('Close', ticker)], np.nan)
    data['Long_Exit'] = np.where(data['Signal'] == -2, data[('Close', ticker)], np.nan)

    '''
    #시각화를 위한 매수, 매도 시점의 가격 칼럼 계산
    data['Buy_Price'] = np.where(signals==1, prices, np.nan)
    data['Sell_Price'] = np.where(signals==-1, prices, np.nan)
    '''
    
    # 누적 수익률 계산 출력
    data['Cumulative_Return']=np.array(asset)/cash_init
    final_cum_return=data['Cumulative_Return'].iloc[-1]-1

    if verbose: 
        print(f'Final cumulative return of the strategy: {100*final_cum_return: .2f}%')

    return data, final_cum_return

## Optimizer

In [23]:
def mac_long_short_optimizer(input_df):
    #장단기 설정
    short_window = list(range(5, 22))  # 5~21
    long_window = list(range(22, 51))  # 22~50

    ret_list=[] #각 파라미터 조합과 수익률을 저장할 리스트

    #장단기 및 스탑로스 모든 조합에 대해 테스트
    for x1, x2 in [(a,b) for a in short_window for b in long_window]:
        df = input_df.copy()
        _, ret = mac_long_short(df, x1, x2, verbose=False)
        ret_list.append((x1, x2, ret))


    # 여러 개의 최적해를 '모두' 찾고 최적값은 중위값을 선택
    max_ror = max(ret_list, key=lambda x:x[2])[2]  # ret 값 기준으로 최대인 튜플 찾고, 거기서 ret값만 저장
    max_tups=[tup for tup in ret_list if tup[2] == max_ror] # 맥스 ret값을 만족하는 모든 tup 찾아서 리스트에 저장
    
    params1=[tup[0] for tup in max_tups] 
    params2=[tup[1] for tup in max_tups]
    
    opt_param1 = int(np.median(params1))
    opt_param2 = int(np.median(params2))
    
    optimal_df = pd.DataFrame(max_tups, columns=['short_window', 'long_window', 'ror'])  #리스트를 데이터프레임화. ror:파라미터 조합상 최종 수익률

    print(f'Max Tuples: {max_tups}')
    print(f'Optimal Parameters:{opt_param1}, {opt_param2}, ' f'Optimized Return:{100*max_ror: .2f}%')

    return (opt_param1, opt_param2), optimal_df # return optimal_params, optimal_df랑 동일

## tear_sheet

In [10]:
def tear_sheet(data):
    # 수수료
    fee_rate = 0.001
    
    # 투자기간 
    trading_period = len(data)/252
    print(f'Trading Period: {trading_period:.1f} years')

    # 누적 수익률
    buy_and_hold = data[('Close', ticker)].iloc[-1]*(1-fee_rate)/data[('Close', ticker)].iloc[0]*(1+fee_rate) - 1 # 그냥 사서 들고 있었을 때의 누적 수익률
    final_cum_return = data['Cumulative_Return'].iloc[-1] - 1 # 추세추종전략 시행시의 누적 수익률
    print(f'Final cumulative return of the strategy: {100*final_cum_return:.2f}%, Buy&Hold:{100*buy_and_hold:.2f}%')

    # 연평균 수익률 CAGR
    CAGR_benchmark = (buy_and_hold+1)**(1/trading_period)-1
    CAGR_strategy = (data['Cumulative_Return'].iloc[-1])**(1/trading_period)-1 
    # data['Cumulative_Return']이 자주 나오는데, 
    # 만약 데이터가 매우 크고 반복 계산이 많다면 CR=data[...].values로 따로 뽑아서 numpy 배열로 작업하는 게 훨씬 빠르긴 함
    # 아니면 편의상 CR = data['Cumulative_Return']로 놓고 쓸 수도 있고 뭐
    print(f'Strategy CAGR: {100*CAGR_strategy: .2f}%, Benchmark CAGR: {100*CAGR_benchmark:.2f}%')

    # 샤프지수
    risk_free_rate = 0.003
    strategy_daily_return = data['Cumulative_Return'].pct_change().fillna(0) # 일일 수익률 싹 다 계산
    mean_return = strategy_daily_return.mean()*252 # 일일 수익률 평균에 252를 곱해서 연 평균 수익률로 다시 환산
    std_return=strategy_daily_return.std()*np.sqrt(252) # 동일
    sharp_ratio=(mean_return - risk_free_rate) / std_return
    print(f'Sharpe Ratio: {sharp_ratio:.2f}')

    # 최대 낙폭 MDD(Maximum DrawDown)

    # 전략의 MDD
    data['Cumulative_Max']=data['Cumulative_Return'].cummax() # 누적 수익률의 최고값을 계속 추적해서 끌고 감. [1.2, 0.9, 1.3, 0.8] >> [1.2, 1.2, 1.3, 1.3]
    data['Drawdown'] = data['Cumulative_Return'] / data ['Cumulative_Max'] - 1 # 최고 수익률 대비 현재 수익률 (낙폭)
    max_drawdown = data['Drawdown'].min() # 낙폭 중 가장 깊은 순간 캐치

    # 벤치마크 MDD
    cumulative_returns = (1 + data[('Close', ticker)].pct_change()).cumprod() # 하루하루 주가 변화량을 적어놓고, 거기에 1을 더한 상태에서, 앞에서부터 복리로 누적곱 계산)
    running_max = cumulative_returns.cummax()
    drawdown = cumulative_returns/running_max - 1 # 당시 주가가 최고점 대비 얼마나 하락했는지 (현재 수익률 / 최고 수익률 - 1)
    mdd_benchmark = drawdown.min() # 낙폭 중 가장 깊은 순간 캐치

    print(f'Strategy MDD: {100*max_drawdown:.2f}%, Benchmark MDD: {100*mdd_benchmark:.2f}%')

    ## 승률
    buy_signals = data[data['Signal'] == 2].index #매수 '날짜' 추출한 리스트
    sell_signals = data[data['Signal'] == -2].index #매도 '날짜' 추출한 리스트
    
    long_returns = [] 
    long_holding_periods = [] 
    short_returns = [] 
    short_holing_periods = [] 

    all_signals = sorted(list(buy_signals) + list(sell_signals)) 
    
    for buy_date in buy_signals:
        sell_dates = sell_signals[sell_signals > buy_date] # pandas 인덱스 슬라이싱. 매수 날짜보다 나중에 오는 매도 날짜 리스트
        if not sell_dates.empty: # 매수 이후에 매도가 있으면 실행
            sell_date = sell_dates[0] # 매수 이후 첫 번째 매도 날짜
            buy_price = data.loc[buy_date, ('Close', ticker)] # 매수 때의 종가
            sell_price = data.loc[sell_date, ('Close', ticker)] #매도 때의 종가
            return_pct = (sell_price*(1-fee_rate))/(buy_price*(1+fee_rate)) - 1 # 수익률
            returns.append(return_pct) # 수익률 리스트에 추가
            holding_period = np.busday_count(buy_date.date(), sell_date.date()) # np.busday_count(start, end)는 영업일 기준 일수 계산. buy_date.date()는 날짜에서  
            holding_periods.append(holding_period) # 보유일 수 (=보유기간) 리스트에 추가

    long_profitable_trades = len([r for r in returns if r > 0]) # 수익 난 거래 일 수 변수에 저장
    long_total_trades = len(long_returns) # 전체 거래 일 수
    
    short_profitable_trades = len([r for r in returns if r > 0])
    short_total_trades = len(short_returns) # 전체 거래 일 수
    
    long_win_rate = long_profitable_trades / total_trades if long_total_trades > 0 else 0
    short_win_rate = short_profitable_trades / total_trades if short_total_trades > 0 else 0
    
    print(f'Number of Long Trades:{long_total_trades}, \
            Number of Short Trades:{short_total_trades}, \
            Long Win Rate:{100*long_win_rate:.2f}%, \
            Long Win Rate:{100*short_win_rate:.2f}%')

    ## 평균 보유 기간
    if long_holding_periods: # 리스트가 비어 있지 않으면(True이면)
        avg_long_holding_period = np.mean(long_holding_periods)
    else:
        avg_long_holding_period = 0

    if short_holding_periods: # 리스트가 비어 있지 않으면(True이면)
        avg_short_holding_period = np.mean(short_holding_periods)
    else:
        avg_short_holding_period = 0
    print(f'Average Long Holding Period:{avg_long_holding_period:.1f}days, \
            Average Short Holding Period: {avg_short_holding_period:.1f}days')

## 구동

In [24]:
optimal_params, optimal_df = mac_long_short_optimizer(df)
data, ret = mac_long_short(df, optimal_params[0], optimal_params[1])

Max Tuples: [(5, 22, np.float64(0.0)), (5, 23, np.float64(0.0)), (5, 24, np.float64(0.0)), (5, 25, np.float64(0.0)), (5, 26, np.float64(0.0)), (5, 27, np.float64(0.0)), (5, 28, np.float64(0.0)), (5, 29, np.float64(0.0)), (5, 30, np.float64(0.0)), (5, 31, np.float64(0.0)), (5, 32, np.float64(0.0)), (5, 33, np.float64(0.0)), (5, 34, np.float64(0.0)), (5, 35, np.float64(0.0)), (5, 36, np.float64(0.0)), (5, 37, np.float64(0.0)), (5, 38, np.float64(0.0)), (5, 39, np.float64(0.0)), (5, 40, np.float64(0.0)), (5, 41, np.float64(0.0)), (5, 42, np.float64(0.0)), (5, 43, np.float64(0.0)), (5, 44, np.float64(0.0)), (5, 45, np.float64(0.0)), (5, 46, np.float64(0.0)), (5, 47, np.float64(0.0)), (5, 48, np.float64(0.0)), (5, 49, np.float64(0.0)), (5, 50, np.float64(0.0)), (6, 22, np.float64(0.0)), (6, 23, np.float64(0.0)), (6, 24, np.float64(0.0)), (6, 25, np.float64(0.0)), (6, 26, np.float64(0.0)), (6, 27, np.float64(0.0)), (6, 28, np.float64(0.0)), (6, 29, np.float64(0.0)), (6, 30, np.float64(0.0)),

In [None]:
tear_sheet(data)