In [None]:
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
import math
import warnings

# FutureWarning 제거
warnings.simplefilter(action='ignore', category=FutureWarning)

In [None]:
# 2016년 ~ 2024년 9월까지의 데이터를 미리 저장해두고 선택한 기간만큼 추출 : 보조지표 계산을 위함
# 백테스트 시트와 주가 데이터가 다름 : https://kr.investing.com/etfs/proshares-trust-ultrapro-qqq-historical-data 에서 확인 결과 파이썬 프로그램이 맞음
# 구현한 프로그램은 주가를 0.01단위로 반올림 후 등락률을 계산하지만, 사이트의 등락률은 yfinance의 데이터처럼 주가가 0.01 단위가 아닌 경우의 값임
def load_data(ticker, start_date, end_date):
    df = yf.download(ticker, start='2016-01-01', end='2024-09-30')
    df.drop(labels=['Adj Close', 'Volume'], axis=1, inplace=True)
    df.fillna(0, inplace=True)
    df = df.round(2)

    # 종가 등락률 추가
    df['Return'] = df['Close'].pct_change()

    # 이동평균선 : 추세 파악에 용이하도록 단기 이평선 이용
    df['MA_5'] = df['Close'].rolling(window=5).mean()
    df['MA_10'] = df['Close'].rolling(window=10).mean()
    df['MA_20'] = df['Close'].rolling(window=20).mean()    # 볼린저 밴드 계산에도 이용

    # RSI - 14일 : Wilder가 제안한 기간
    delta = df['Close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    df['RSI'] = rs / (1 + rs)

    # 볼린저 밴드 - 20일 : 1달 평균 영업
    df['BB_upper'] = df['MA_20'] + 2 * df['Close'].rolling(window=20).std()
    df['BB_lower'] = df['MA_20'] - 2 * df['Close'].rolling(window=20).std()

    # VIX
    vix = yf.download('^VIX', start='2016-01-01', end='2024-09-30')
    vix = vix[['Close']].rename(columns={'Close': 'VIX'})
    df = df.join(vix, how='left')

    # ATR : (고가 - 저가), (고가 - 전일 종가), (전일 종가 - 저가)
    df['High-Low'] = df['High'] - df['Low']
    df['High-PreviousClose'] = abs(df['High'] - df['Close'].shift(1))
    df['PreviousClose-Low'] = abs(df['Low'] - df['Close'].shift(1))
    df['TR'] = df[['High-Low', 'High-PreviousClose', 'PreviousClose-Low']].max(axis=1)
    df['ATR'] = df['TR'].rolling(window=14, min_periods=1).mean()
    df['ATR_mean'] = df['ATR'].rolling(window=14, min_periods=1).mean()     # 14일간 ATR 평균 : 변동성 판단의 기준으로 이용
    df.drop(labels=['High-Low', 'High-PreviousClose', 'PreviousClose-Low', 'TR'], axis=1, inplace=True)

    # 선택 기간만 추출
    df = df[start_date : end_date]

    return df

In [None]:
# 백테스트 시트와 맞도록 개량
# 1. 매도 부분에서 1/4와 나머지 전부가 아닌 1/4와 3/4를 int로 형변환 -> 4개 미만의 주식이 남을 수 있음
# 2. 위 문제를 해결하기 위해 보유 주식이 4개 미만인 경우 별% 가격으로 전량 매도 -> 하루 날림
# 3. 수수료를 매도 시 한번에 계산함

# 백테스트 시트의 조건문을 확인한 결과 매도금, 매수금 계산이 정확하지 않음 : 소수점 계산 문제인 것으로 추측됨
class InfiniteBuyStrategy:
    def __init__(self, df, start_date, end_date, initial_funds, trend='No', volatility='', rate=1):
        self.start_date = start_date
        self.end_date = end_date
        self.initial_funds = initial_funds
        self.one_buy_amount_default = initial_funds / 40    # 기본값 : 40분할
        self.one_buy_amount_quarter = 0                     # 쿼터 손절 모드의 1회 총액
        self.data = df
        self.holdings = 0
        self.cash = initial_funds
        self.average_price = 0
        self.trend = trend
        self.volatility = volatility
        self.rate = rate
        self.quarter_loss_mode = False
        self.quarter_loss_days = 0
        self.quarter_loss_triggered = False    # 쿼터 손절 모드 발생 여부
        self.quarter_loss_exit = False         # 쿼터 손절 모드 탈출 여부
        self.gain_count = 0
        self.loss_count = 0
        self.quarter_loss_count = 0            # 쿼터 손절 횟수 : loss_count에 포함되지만 특수한 경우라서 추가로 계산함
        self.buy_log = []
        self.sell_log = []
        self.trade_log = pd.DataFrame(columns=[
            '일자', '시가', '고가', '저가', '종가', '등락률', '1회 매수액', '평균 단가', 'T 값', '별% 가격',
            '평단 매수 수량', '별% 매수 수량', '10% 매도 수량', '별% 매도 수량', 'MOC 매도 수량', '보유 수량', '비고', '예수금', '평가금', '수익률'])

    def round(self, number, decimal_places):
        num = number * (10 ** decimal_places) + 0.5

        return math.floor(num) / (10 ** decimal_places)

    def buy(self, date, price, qty):
        self.holdings += qty
        self.cash -= qty * price
        self.average_price = self.round((self.average_price * (self.holdings - qty) + price * qty) / self.holdings, 2)
        self.buy_log.append((date, price, qty))

    def sell(self, date, price, qty):
        buy_amount = qty * self.trade_log['평균 단가'].iloc[-1]   # 매수금 : 전날 평단 * 주식 수
        sell_amount = qty * price - (qty * price + buy_amount) * 0.00044    # 매도 시점에서 매수 수수료까지 정산

        # 매도금이 매수금보다 크면 gain, 반대면 loss 증가
        if sell_amount > buy_amount:
            self.gain_count += 1
        elif sell_amount < buy_amount:
            self.loss_count += 1

        self.holdings -= qty
        self.cash += sell_amount
        self.sell_log.append((date, price, qty))

        if(self.holdings == 0):
            self.average_price = 0

    def calculate_star_price(self, T):
        return self.round(self.average_price * (1.1 - (T * 0.005)), 2)

    def cycle(self, date, row, T):
        open_price = row['Open']
        close_price = row['Close']
        high_price = row['High']
        price_change = row['Return']

        one_buy_amount = 0
        buy_quantity_avg_price = 0
        buy_quantity_star_price = 0
        sell_quantity_star_price = 0
        sell_quantity_10_price = 0
        holdings_raw = 0
        holdings_one_quarter = 0

        # 쿼터 손절 모드와 일반 모드 구분
        if self.quarter_loss_mode:
            star_price = self.round(self.average_price * 0.9, 2)    # 별% 가격 : -10% 고정
            one_buy_amount = self.one_buy_amount_quarter            # 1회 총액 : 쿼터 손절 진입 시 갱신한 값
            T = 40                                                  # 후반전 로직으로 매수 진행
        else:
            star_price = self.calculate_star_price(T)
            one_buy_amount = self.round(self.one_buy_amount_default * self.apply_indicator(date, close_price), 0)  # 보조지표 계산

        # 매도 ------------------------------------------------------------------------------
        # 보유 주식의 1/4을 별% 가격에서 LOC 매도, 나머지는 110%에서 지정가 매도
        holdings_raw = self.holdings

        if close_price >= star_price:   # LOC
            if holdings_raw < 4:    # 4개 미만이면 전량 매도
                sell_quantity_star_price = holdings_raw
            else :
                sell_quantity_star_price = int(holdings_raw *0.25)

            self.sell(date, close_price, sell_quantity_star_price)

            if self.quarter_loss_mode:
                self.quarter_loss_exit = True  # 매도 성공 시 쿼터 손절 모드 탈출

        if high_price >= self.average_price * 1.1:    # 지정가
            sell_quantity_10_price = int(holdings_raw * 0.75)
            self.sell(date, self.average_price * 1.1, sell_quantity_10_price)

            if self.quarter_loss_mode:
                self.quarter_loss_exit = True  # 매도 성공 시 쿼터 손절 모드 탈출

        # 매수 ------------------------------------------------------------------------------
        if T <= 20:
            # 전반전 매수 전략: 평균 단가의 LOC 50% 매수, 별% 가격으로 50% 매수
            if close_price <= self.average_price:
                buy_quantity_avg_price = (one_buy_amount / 2) // close_price
                self.buy(date, close_price, buy_quantity_avg_price)
            if close_price < star_price:
                buy_quantity_star_price = (one_buy_amount / 2) // close_price
                self.buy(date, close_price, buy_quantity_star_price)
        else:
            # 후반전 매수 전략: 전부 별% 가격으로 매수
            if close_price < star_price:
                buy_quantity_star_price = one_buy_amount // close_price
                self.buy(date, close_price, buy_quantity_star_price)

        self.append_df(date, T, one_buy_amount, star_price, buy_quantity_avg_price, buy_quantity_star_price, sell_quantity_10_price, sell_quantity_star_price, 0)

    def append_df(self, date, T, one_buy_amount, star_price, buy_quantity_avg_price, buy_quantity_star_price, sell_quantity_10_price, sell_quantity_star_price, sell_MOC):
        average_price = self.average_price
        close_change = self.round((self.data['Return'].loc[date] * 100), 2)
        return_percent = self.round(((self.holdings * self.data['Close'].loc[date] + self.cash) / self.initial_funds - 1) * 100, 2)
        note = ''

        if self.quarter_loss_mode :
            note = '쿼터 손절 모드 O'
            T = '쿼터 손절 모드'

        if T == 0:
            star_price = ''
            average_price = self.data['Close'].loc[date]
            note = '첫 매수'

        # 일별 로그 기록
        new_row = pd.DataFrame({
            '일자': [date],
            '시가': [self.data['Open'].loc[date]],
            '고가': [self.data['High'].loc[date]],
            '저가': [self.data['Low'].loc[date]],
            '종가': [self.data['Close'].loc[date]],
            '등락률': [close_change],
            '1회 매수액': [one_buy_amount],
            '평균 단가': [average_price],
            'T 값': [T],
            '별% 가격': [star_price],
            '평단 매수 수량': [buy_quantity_avg_price],
            '별% 매수 수량': [buy_quantity_star_price],
            '10% 매도 수량': [sell_quantity_10_price],
            '별% 매도 수량': [sell_quantity_star_price],
            'MOC 매도 수량': [sell_MOC],
            '보유 수량': [self.holdings],
            '비고': [note],
            '예수금': [self.round(self.cash, 2)],
            '평가금': [self.round(self.holdings * self.data['Close'].loc[date], 2)],
            '수익률': [return_percent]
        })

        # 기존 데이터프레임에 새로운 행을 concat 사용하여 추가
        self.trade_log = pd.concat([self.trade_log, new_row], ignore_index=True)

    # 추세 지표 신호 : 매수 1, 매도 -1, 기본 0 반환
    def apply_trend(self, date, close_price):
        indicator = 0

        # 이동평균선
        if self.trend[0:2] == 'MA':
            if close_price > self.data[self.trend].loc[date]:
                indicator = 1
            elif close_price < self.data[self.trend].loc[date]:
                indicator = -1
        # RSI
        elif self.trend == 'RSI':
            if self.data['RSI'].loc[date] <= 30:
                indicator = 1
            elif self.data['RSI'].loc[date] >= 70:
                indicator = -1
        # 볼린저 밴드
        elif self.trend == 'BB':
            if close_price <= self.data['BB_lower'].loc[date]:
                indicator = 1
            elif close_price >= self.data['BB_upper'].loc[date]:
                indicator = -1

        return self.rate ** indicator

    # 지표 결합
    def apply_indicator(self, date, close_price):
        index = self.apply_trend(date, close_price)   # 추세 지표에서 반환받은 배율
        magnification = 0

        # ATR
        if self.volatility == 'ATR':
            # 불확실성 큼
            if self.data['ATR'].loc[date] > self.data['ATR_mean'].loc[date]:
                magnification = -1
            # 불확실성 작음
            elif self.data['ATR'].loc[date] < self.data['ATR_mean'].loc[date]:
                magnification = 1

        elif self.volatility == 'VIX':
            if self.data['VIX'].loc[date] > 30:
                magnification = 1
            elif self.data['VIX'].loc[date] < 20:
                magnification = -1

        # 변동성 지표 사용 안하면 추세 지표 반환
        return index * (2 ** magnification)

    def run_backtest(self):
        for date, row in self.data.iterrows():    # 인덱스, 시리즈 쌍 반환
            accumulated_purchase_amount = self.holdings * self.average_price
            T = self.round(accumulated_purchase_amount / self.one_buy_amount_default, 2)

            # 첫 회차 : 종가로 전부 매수
            if self.holdings == 0:
                buy_quantity_avg_price = self.one_buy_amount_default // row['Close']
                self.buy(date, row['Close'], buy_quantity_avg_price)

                self.append_df(date, T, self.one_buy_amount_default, self.calculate_star_price(T), buy_quantity_avg_price, 0, 0, 0, 0)

            # 쿼터 손절 모드 진입
            elif 39 < T <= 40 and not self.quarter_loss_triggered:
                self.quarter_loss_mode = True
                self.quarter_loss_triggered = True
                self.quarter_loss_exit = False
                self.quarter_loss_days = 0      # 손절 모드 진행 기간 초기화
                self.quarter_loss_count += 1    # 쿼터손절 횟수 1 증가

                # 보유 주식의 1/4 매도
                sell_quantity = self.holdings // 4
                self.sell(date, row['Close'], sell_quantity)

                # 1회 매수 총액 재조정 (한 번만)
                self.one_buy_amount_quarter = min(self.one_buy_amount_default, self.cash / 10)    # 10분할 금액의 최댓값이 기존 1회 총액

                self.append_df(date, T, 0, self.calculate_star_price(T), 0, 0, 0, 0, sell_quantity)

            # 일일 매매 수행
            else :
                self.cycle(date, row, T)

            # 10일 동안 매도 체결 실패 시 쿼터 손절 모드 반복
            if self.quarter_loss_days == 10 and self.quarter_loss_exit == False:
                self.quarter_loss_triggered = False    # 쿼터손절 trigger가 다시 발생하도록

            # 쿼터 손절 모드 탈출
            elif self.quarter_loss_exit:
                self.quarter_loss_mode = False
                self.quarter_loss_exit = False
                self.quarter_loss_triggered = False

        # 데이터프레임 정리
        modify_column = ['평단 매수 수량', '별% 매수 수량', '10% 매도 수량', '별% 매도 수량', 'MOC 매도 수량']
        self.trade_log[modify_column] = self.trade_log[modify_column].replace(0, "")
        self.trade_log['등락률'] = self.trade_log['등락률'].astype(str) + '%'
        self.trade_log['수익률'] = self.trade_log['수익률'].astype(str) + '%'
        self.trade_log.fillna('', inplace=True)

    def print_result(self):
        winning_rate = self.round(self.gain_count / (self.gain_count + self.loss_count) * 100, 2)
        print('수익률 : {0:>6} / 쿼터손절 : {1:>3} / 승률 : {2:>6}%'.format(
            self.trade_log['수익률'].iloc[-1], self.quarter_loss_count, winning_rate))

In [None]:
# TQQQ에 대한 무한매수법 백테스트 실행
ticker = 'TQQQ'
start_date = '2020-01-01'
end_date = '2023-12-31'
initial_funds = 40000  # 초기 자본

df = load_data(ticker, start_date, end_date)
print('기간 :', start_date, '~', end_date)

# 일반 무한매수법
normal = InfiniteBuyStrategy(df=df, start_date=start_date, end_date=end_date, initial_funds=initial_funds)
normal.run_backtest()
print('무한매수법', end=' ')
normal.print_result()

# 보조지표 : MA, RSI, BB(볼린저 밴드) / 다른 단어는 기본 무매법으로 작동
print('\n단일 보조지표, 배율 조정')
indicators = ['MA_5', 'MA_10', 'MA_20', 'RSI', 'BB']
rates = [1.2, 2, 5]

for indicator in indicators:
    for rate in rates:
        strategy = InfiniteBuyStrategy(df=df, start_date=start_date, end_date=end_date, initial_funds=initial_funds, trend=indicator, rate=rate)
        strategy.run_backtest()
        print('{0:>5} {1:>3}배'.format(indicator, rate), end='  ')
        strategy.print_result()

# 추세 + 변동성
print('\n변동성 지표 결합')
trends = ['MA_20', 'RSI', 'BB']
volatilities = ['ATR', 'VIX']

for trend in trends:
    for volatility in volatilities:
        strategy = InfiniteBuyStrategy(df=df, start_date=start_date, end_date=end_date, initial_funds=initial_funds, trend=trend, volatility=volatility, rate=2)
        strategy.run_backtest()
        print('{0:>5} {1:>3}'.format(trend, volatility), end='  ')
        strategy.print_result()

# trade_log를 csv로 저장
#normal.trade_log.to_csv('infinite_buy_trade_log.csv', index=False)

# scv를 구글 드라이브로 이동
#!cp /content/infinite_buy_trade_log.csv /content/drive/MyDrive/

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


기간 : 2020-01-01 ~ 2023-12-31
무한매수법 수익률 : 67.75% / 쿼터손절 :   4 / 승률 :  92.27%

단일 보조지표, 배율 조정
 MA_5 1.2배  수익률 : 67.57% / 쿼터손절 :   3 / 승률 :  92.72%
 MA_5   2배  수익률 : 68.87% / 쿼터손절 :   1 / 승률 :  89.81%
 MA_5   5배  수익률 : 66.96% / 쿼터손절 :   5 / 승률 :   86.5%
MA_10 1.2배  수익률 : 69.64% / 쿼터손절 :   3 / 승률 :  91.79%
MA_10   2배  수익률 : 71.61% / 쿼터손절 :   3 / 승률 :  88.41%
MA_10   5배  수익률 : 85.01% / 쿼터손절 :   2 / 승률 :  84.67%
MA_20 1.2배  수익률 : 73.28% / 쿼터손절 :   3 / 승률 :  91.83%
MA_20   2배  수익률 : 74.89% / 쿼터손절 :   3 / 승률 :  91.59%
MA_20   5배  수익률 :  85.6% / 쿼터손절 :   3 / 승률 :  83.57%
  RSI 1.2배  수익률 : 66.24% / 쿼터손절 :   4 / 승률 :  90.71%
  RSI   2배  수익률 :  75.3% / 쿼터손절 :   6 / 승률 :  83.68%
  RSI   5배  수익률 : 90.43% / 쿼터손절 :   5 / 승률 :  77.38%
   BB 1.2배  수익률 : 69.26% / 쿼터손절 :   5 / 승률 :  92.27%
   BB   2배  수익률 : 70.25% / 쿼터손절 :   5 / 승률 :  91.52%
   BB   5배  수익률 : 67.67% / 쿼터손절 :   4 / 승률 :  89.34%

변동성 지표 결합
MA_20 ATR  수익률 : 76.73% / 쿼터손절 :   4 / 승률 :  88.75%
MA_20 VIX  수익률 : 91.59% / 쿼터손절 :   2 / 승률 :   93.1