# Setting

In [45]:

# matplotlib에 한글 폰트 설정
from matplotlib import rcParams
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm

# 폰트 설정
font_path = 'C:/Windows/Fonts/NanumGothic.ttf'
fm.fontManager.addfont(font_path)
font_prop = fm.FontProperties(fname=font_path)

# matplotlib 설정
plt.rc('font', family=font_prop.get_name())

#음수기호해결
rcParams['axes.unicode_minus'] = False

# 테스트 출력
print(f"Font set to: {font_prop.get_name()}")


Font set to: NanumGothic


In [46]:
# !pip install yfinance
# !pip install statsmodels


# import

In [47]:
import os
import yfinance as yf
import pandas as pd

import numpy as np
import statsmodels.api as sm

from statsmodels.tsa.stattools import adfuller

import itertools

import time

import backtrader as bt
from datetime import datetime

import matplotlib.pyplot as plt
import matplotlib
from multiprocessing import Pool


# Read S&P 500

## s&p500크롤링

In [48]:

csv_file_path = "./csv/snp500.csv"

#각 종목의 시가총액 가져오는 함수 !이젠 필요없음
def get_market_cap(ticker):
    stock = yf.Ticker(ticker)
    try:
        market_cap = stock.info['marketCap']  # 시가총액
    except KeyError:
        market_cap = None
    return market_cap

# S&P 500 종목 리스트 가져오기
# pd.read_html() 함수는 웹페이지에 있는 HTML 테이블을 DataFrame으로 변환
def load_sp500_data():
    if os.path.exists(csv_file_path):
        print("CSV파일에서 s&p500 데이터를 로드합니다.")
        sp500 = pd.read_csv(csv_file_path)
    else:
         print("웹에서 S&P 500 데이터를 가져옵니다.")
         sp500 = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0]
         sp500['Market Cap'] = sp500['Symbol'].apply(get_market_cap) #시가총액 주가
         sp500.to_csv(csv_file_path, index=False) #데이터를 csv로 저장

    return sp500


sp500 = load_sp500_data()


# 시가총액 정보를 추가 위해 티커(symbol) 리스트를 추출
tickers = sp500['Symbol'].tolist()


# 섹터별 시가총액 상위 4개 종목 선택 !이젠 필요 없음
top_by_sector = sp500.groupby('GICS Sector').apply(lambda x: x.nlargest(4, 'Market Cap'))

# 산업군 전체 종목 선택
all_per_industry = sp500.groupby('GICS Sub-Industry').apply(lambda x: x)


# 섹터별로 시가총액 상위 종목들 출력
#print(top_by_sector[['Symbol', 'Security', 'Market Cap']])

# 산업군 별 시가총액 상위 종목 출력
#print(all_per_industry[['Symbol', 'Security', 'Market Cap']])

# 파일로 저장
sp500.to_csv("./csv/snp500.csv", index=False)  # 이미 데이터가 있으면 덮어쓰지 않음


CSV파일에서 s&p500 데이터를 로드합니다.


  top_by_sector = sp500.groupby('GICS Sector').apply(lambda x: x.nlargest(4, 'Market Cap'))
  all_per_industry = sp500.groupby('GICS Sub-Industry').apply(lambda x: x)


## 섹터, 산업군 별 데이터

In [49]:

# 각 섹터의 종목 데이터를 가져오기 위한 함수
def get_data(tickers, start_date="2023-01-01", end_date="2023-12-31"):
    # 데이터 수집 (Adjusted Close 가격을 가져옴)
    data = yf.download(tickers, start=start_date, end=end_date)['Adj Close']
    return data


#기본 분석 기간
default_start_date = "2023-01-01"
default_end_date = "2023-12-31"

#데이터 예시
it_sector_tickers = top_by_sector.loc['Information Technology']['Symbol'].tolist()

it_sector_data = get_data(it_sector_tickers)


it_industry_tickers = all_per_industry.loc['Advertising']['Symbol'].tolist()

it_industry_data = get_data(it_industry_tickers)


# 종목 간 상관관계
sector_correlation_matrix = it_sector_data.corr()
print("섹터 상관관계 매트릭스:")
print(sector_correlation_matrix)


industry_correlation_matrix = it_industry_data.corr()
print("산업군 상관관계 매트릭스:")
print(industry_correlation_matrix)


[*********************100%***********************]  4 of 4 completed
[*********************100%***********************]  2 of 2 completed

섹터 상관관계 매트릭스:
Ticker      AAPL      AVGO      MSFT      NVDA
Ticker                                        
AAPL    1.000000  0.875553  0.948934  0.921846
AVGO    0.875553  1.000000  0.917176  0.943091
MSFT    0.948934  0.917176  1.000000  0.936636
NVDA    0.921846  0.943091  0.936636  1.000000
산업군 상관관계 매트릭스:
Ticker       IPG       OMC
Ticker                    
IPG     1.000000  0.937923
OMC     0.937923  1.000000





## 볼린저 밴드 그래프 함수, 안정적 페어 고르기 함수

In [50]:
# -------- 코드 셀 2: 볼린저 밴드 그래프 및 안정적인 페어 선택 --------

def calculate_p_value(data, ticker1, ticker2):
    # 독립 변수와 종속 변수에 대해 NaN 및 inf 값 제거
    x = data[ticker1].replace([np.inf, -np.inf], np.nan).dropna()
    y = data[ticker2].replace([np.inf, -np.inf], np.nan).dropna()

    # 데이터의 길이를 맞추기 위해 공통 인덱스 사용
    common_index = x.index.intersection(y.index)
    x = x.loc[common_index]
    y = y.loc[common_index]

    # 데이터 길이가 충분한지 확인
    if len(x) == 0 or len(y) == 0:
        print(f"Insufficient data for {ticker1} and {ticker2}. Skipping this pair.")
        return None, None

    # 상수항 추가 후 OLS 회귀 분석
    x = sm.add_constant(x)
    ols_model = sm.OLS(y, x).fit()
    residuals = ols_model.resid

    # 잔차에 대해 ADF 테스트 수행
    adf_result = adfuller(residuals)
    return adf_result[1], residuals  # p-value와 잔차 반환

def plot_bollinger_bands(residuals, ticker1, ticker2, save_path=None):
    # 30일 이동 평균과 표준편차 계산
    moving_avg = residuals.rolling(window=30).mean()
    moving_std = residuals.rolling(window=30).std()

    # 볼린저 밴드 계산
    upper_band = moving_avg + (2 * moving_std)
    lower_band = moving_avg - (2 * moving_std)
    
    # 매매 규칙
    # 잔차가 상단을 넘으면 매도, 하단을 넘으면 매수
    #entry_signals = (residuals > upper_band) | (residuals < lower_band)
    #exit_signals = residuals.rolling(window=60).mean().shift(1)  # 이동평균선이 교차할 때 청산

    # 볼린저 밴드와 잔차 시각화
    plt.figure(figsize=(10,6))
    plt.plot(residuals, label='잔차', color='blue')
    plt.plot(moving_avg, label='30일 이동 평균', color='orange')
    plt.fill_between(residuals.index, lower_band, upper_band, color='gray', alpha=0.3, label='볼린저 밴드')
    plt.legend()
    plt.title(f'잔차와 볼린저 밴드 ({ticker1} vs {ticker2})')
    #plt.show()->파일로 저장하면 굳이 필요 없어서
    # 파일로 저장
    if save_path:
        plt.savefig(save_path, format='png', dpi=300, bbox_inches='tight')  # 해상도와 여백 조정
        print(f"Bollinger Band graph saved to {save_path}")
    
    # 메모리 정리
    plt.close()


# 안정적인 페어 선택 함수
def select_stable_pair(sp500, start_date, end_date, adf_threshold=0.05):
    industry_groups = sp500.groupby("GICS Sub-Industry")

    stable_pairs = []

    for industry, group in industry_groups:
        tickers = group['Symbol'].tolist()
        if len(tickers) < 2:
            continue  # 페어 분석을 위한 최소 두 개 이상의 종목이 필요

        data = get_data(tickers, start_date=start_date, end_date=end_date)
        pairs = list(itertools.combinations(tickers, 2))

        best_pair = None
        lowest_std = float('inf')

        for pair in pairs:
            # TODO: 티커 2개를 받아서 p-value를 반환하는 함수로 분리하기
            x_ticker, y_ticker = pair
            p_value, residuals = calculate_p_value(data, x_ticker, y_ticker)

            # 잔차가 평균 회귀적 성향을 보이는지 ADF 테스트로 확인
            if p_value is not None:
                # 잔차의 표준편차 계산
                residual_std = residuals.std()
                #print(f"Pair: {x_ticker} and {y_ticker} - 잔차 표준편차: {residual_std}, ADF p-value: {p_value}")

                if residual_std < lowest_std and p_value < adf_threshold:
                    best_pair = pair
                    lowest_std = residual_std
                    best_p_value = p_value
                    #stable_pair = pair
                    #lowest_std = residual_std
        if best_pair:
                stable_pairs.append({
                    "Sector": group["GICS Sector"].iloc[0],
                    "Industry Group": industry,
                    "Ticker 1": best_pair[0],
                    "Ticker 2": best_pair[1],
                    "P-Value": best_p_value,
                    "Residual Std": lowest_std
                })
    return pd.DataFrame(stable_pairs)



## 페어 테이블

In [51]:
#stable_pairs_df = select_stable_pair(sp500, start_date="2023-01-01", end_date="2023-12-31")
#print(stable_pairs_df)

# 안정적인 페어 테이블을 CSV 파일로 저장
#stable_pairs_df.to_csv("./csv/stable_pairs.csv", index=False)

file_path = "./csv/stable_pairs.csv"

# 파일 존재 여부 확인
if os.path.exists(file_path):
    print("파일이 이미 존재합니다. 기존 파일을 불러옵니다.")
    stable_pairs_df = pd.read_csv(file_path)
else:
    print("파일이 존재하지 않습니다. 데이터를 새로 생성합니다.")
    stable_pairs_df = select_stable_pair(sp500, start_date="2023-01-01", end_date="2023-12-31")
    # 안정적인 페어 테이블을 CSV 파일로 저장
    stable_pairs_df.to_csv(file_path, index=False)

print(stable_pairs_df)


파일이 이미 존재합니다. 기존 파일을 불러옵니다.
                    Sector                                Industry Group  \
0              Industrials                           Aerospace & Defense   
1              Industrials                       Air Freight & Logistics   
2   Information Technology                          Application Software   
3               Financials              Asset Management & Custody Banks   
4              Health Care                                 Biotechnology   
5   Communication Services                                  Broadcasting   
6   Consumer Discretionary                              Broadline Retail   
7   Consumer Discretionary                              Casinos & Gaming   
8   Information Technology                      Communications Equipment   
9               Financials                              Consumer Finance   
10        Consumer Staples           Consumer Staples Merchandise Retail   
11  Consumer Discretionary                                  

## 테이블 페어간 볼린저 밴드

In [52]:


# 산업군별 안정적인 페어에 대한 볼린저 밴드 그래프 출력 함수
def all_bollinger_bands(data_func, stable_pairs_df, start_date, end_date, output_folder="./bollinger_band"):
    # 출력 폴더 생성
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for _, row in stable_pairs_df.iterrows():
        try:
            # 그래프 파일 경로 설정
            file_name = f"{row['Ticker 1']}_{row['Ticker 2']}_bollinger.png"
            file_path = os.path.join(output_folder, file_name)

            # 그래프 파일 존재 여부 확인
            if os.path.exists(file_path):
                print(f"{file_name} 이미 존재합니다. 기존 파일을 사용합니다.")
                continue  # 이미 존재하면 건너뜀


            data = data_func([row['Ticker 1'], row['Ticker 2']], start_date=start_date, end_date=end_date)
            _, residuals = calculate_p_value(data, row['Ticker 1'], row['Ticker 2'])
            if residuals is not None:
                # 수정된 그래프 함수 호출 및 저장
                plot_bollinger_bands(residuals, row['Ticker 1'], row['Ticker 2'], save_path=file_path)
                print(f"{file_name} 그래프 생성 및 저장 완료.")
                
            else:
                print(f"No residuals data available for {row['Ticker 1']} and {row['Ticker 2']}. Skipping.")
        except Exception as e:
            print(f"Error processing {row['Ticker 1']} and {row['Ticker 2']}: {e}")


# 안정적인 페어에 대해 볼린저 밴드 그래프 출력
all_bollinger_bands(get_data, stable_pairs_df, start_date="2023-01-01", end_date="2023-12-31")

GE_HWM_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
CHRW_FDX_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
FICO_PTC_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
BEN_IVZ_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
ABBV_GILD_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
FOXA_WBD_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
AMZN_EBAY_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
LVS_WYNN_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
ANET_JNPR_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
COF_SYF_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
TGT_WMT_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
GPC_LKQ_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
PNC_USB_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
ETR_PPL_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
ETN_EMR_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
KEYS_ZBRA_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
CTVA_FMC_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
MKTX_NDAQ_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
CAH_COR_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
ABT_BAX_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
LH_DGX_bollinger.png 이미 존재합니다. 기존 파일을 사용합니다.
DHI_LEN_bollinger.png

## 동적 선택을 통한 페어 분석 함수

In [53]:

'''
def analyze_by_selection(selection_type, selection_name, start_date, end_date):
    if selection_type == "sector":
        # 선택한 섹터로 분석
        tickers = top_by_sector.loc[selection_name]['Symbol'].tolist()
        data_func = get_data
    elif selection_type == "industry":
        # 선택한 산업군으로 분석
        tickers = all_per_industry.loc[selection_name]['Symbol'].tolist()  # 산업군 선택
        data_func = get_data
    else:
        print("올바르지 않은 선택입니다. 섹터 또는 산업군을 입력하세요.")
        return

    # 페어 분석 수행
    select_stable_pair(data_func, tickers, start_date, end_date)


#사용자 선택 기반 페어 분석 실행
selection_type = input("분석하려는 선택을 입력하세요 (sector or industry): ")
selection_name = input("분석할 섹터 또는 산업군의 이름을 입력하세요: ")
analyze_by_selection(selection_type, selection_name, start_date="2020-01-01", end_date="2023-01-01")
'''

'\ndef analyze_by_selection(selection_type, selection_name, start_date, end_date):\n    if selection_type == "sector":\n        # 선택한 섹터로 분석\n        tickers = top_by_sector.loc[selection_name][\'Symbol\'].tolist()\n        data_func = get_data\n    elif selection_type == "industry":\n        # 선택한 산업군으로 분석\n        tickers = all_per_industry.loc[selection_name][\'Symbol\'].tolist()  # 산업군 선택\n        data_func = get_data\n    else:\n        print("올바르지 않은 선택입니다. 섹터 또는 산업군을 입력하세요.")\n        return\n\n    # 페어 분석 수행\n    select_stable_pair(data_func, tickers, start_date, end_date)\n\n\n#사용자 선택 기반 페어 분석 실행\nselection_type = input("분석하려는 선택을 입력하세요 (sector or industry): ")\nselection_name = input("분석할 섹터 또는 산업군의 이름을 입력하세요: ")\nanalyze_by_selection(selection_type, selection_name, start_date="2020-01-01", end_date="2023-01-01")\n'

## 데이터 모니터링

In [54]:
'''
# 산업군별 안정적 페어를 저장하는 딕셔너리
stable_pairs = {}

# p-value를 계산하는 함수
def check_p_value(data_func, ticker1, ticker2, start_date, end_date):
    data = data_func([ticker1, ticker2], start_date=start_date, end_date=end_date)
    p_value, _ = calculate_p_value(data, ticker1, ticker2)
    return p_value

# 산업군별로 초기 안정적 페어 찾기
def initialize_stable_pairs(industry_tickers, data_func, start_date, end_date):
    stable_pair = select_stable_pair(data_func, industry_tickers, start_date, end_date)
    if stable_pair:
        stable_pairs[stable_pair] = check_p_value(data_func, stable_pair[0], stable_pair[1], start_date, end_date)

# 주기적으로 안정적 페어를 모니터링하는 함수
def monitor_stable_pairs(data_func, start_date, end_date, adf_threshold=0.05, interval=3600):
    while True:
        print("Monitoring stable pairs...")
        for pair in list(stable_pairs.keys()):
            ticker1, ticker2 = pair
            p_value = check_p_value(data_func, ticker1, ticker2, start_date, end_date)

            if p_value is None:
                print(f"Data for pair {ticker1} and {ticker2} is missing. Skipping...")
                continue

            print(f"Current p-value for {ticker1} and {ticker2}: {p_value}")
            if p_value > adf_threshold:
                print(f"p-value for {pair} exceeded threshold. Re-evaluating pairs for this industry...")
                
                # 산업군 내 새로운 페어 찾기
                industry_tickers = all_per_industry.loc[industry].Symbol.tolist()  # 예시: 해당 산업군에 대한 티커 리스트
                new_stable_pair = select_stable_pair(data_func, industry_tickers, start_date, end_date)
                
                if new_stable_pair:
                    stable_pairs[new_stable_pair] = check_p_value(data_func, new_stable_pair[0], new_stable_pair[1], start_date, end_date)
                    print(f"Updated stable pair for {industry}: {new_stable_pair} with p-value {stable_pairs[new_stable_pair]}")
                
                # 기존 페어 제거
                del stable_pairs[pair]

        # 일정 시간 대기 후 다시 체크
        time.sleep(interval)

# 예시 실행 코드
industry_name = "your_industry_name"
industry_tickers = all_per_industry.loc[industry_name]['Symbol'].tolist()
data_func = get_data

# 초기 페어를 찾고 안정적인 페어를 모니터링
initialize_stable_pairs(industry_tickers, data_func, start_date="2020-01-01", end_date="2023-01-01")
monitor_stable_pairs(data_func, start_date="2020-01-01", end_date="2023-01-01")
'''

'\n# 산업군별 안정적 페어를 저장하는 딕셔너리\nstable_pairs = {}\n\n# p-value를 계산하는 함수\ndef check_p_value(data_func, ticker1, ticker2, start_date, end_date):\n    data = data_func([ticker1, ticker2], start_date=start_date, end_date=end_date)\n    p_value, _ = calculate_p_value(data, ticker1, ticker2)\n    return p_value\n\n# 산업군별로 초기 안정적 페어 찾기\ndef initialize_stable_pairs(industry_tickers, data_func, start_date, end_date):\n    stable_pair = select_stable_pair(data_func, industry_tickers, start_date, end_date)\n    if stable_pair:\n        stable_pairs[stable_pair] = check_p_value(data_func, stable_pair[0], stable_pair[1], start_date, end_date)\n\n# 주기적으로 안정적 페어를 모니터링하는 함수\ndef monitor_stable_pairs(data_func, start_date, end_date, adf_threshold=0.05, interval=3600):\n    while True:\n        print("Monitoring stable pairs...")\n        for pair in list(stable_pairs.keys()):\n            ticker1, ticker2 = pair\n            p_value = check_p_value(data_func, ticker1, ticker2, start_date, end_date)\n\n   

# 백테스트

## 데이터 준비

In [62]:
if os.path.exists("./backtest_csv/backtest_data.csv"):
    print("기존 백테스트 데이터를 로드합니다.")
    backtest_data_df = pd.read_csv("./backtest_csv/backtest_data.csv")
    existing_tickers = set(backtest_data_df["Ticker"].unique())
else:
    print("백테스트 데이터가 없습니다. 새로 다운로드를 시작합니다.")
    backtest_data_df = pd.DataFrame()
    existing_tickers = set()

# 테이블에서 종목 리스트 가져오기
tickers_list = list(set(stable_pairs_df["Ticker 1"].tolist() + stable_pairs_df["Ticker 2"].tolist()))

# 데이터 다운로드 및 저장
backtest_data = []
for ticker in tickers_list:
    if ticker in tickers_list:
        print(f"{ticker} 데이터가 이미 존재합니다. 건너뜁니다.")
        continue

    data = yf.download(ticker, start="2024-01-01", end="2024-06-30")
    # 데이터 비어있으면 건너뛰기
    if data.empty:
        print(f"Data for {ticker} is empty. Skipping.")
        continue
    data.reset_index(inplace=True)#인덱스 초기화
    data["Ticker"] = ticker#티커추가

    # 안정적 페어와의 연결성 추가: Pair 열 생성
    data["Pair"] = stable_pairs_df.loc[
        (stable_pairs_df["Ticker 1"] == ticker) | (stable_pairs_df["Ticker 2"] == ticker), 
        ["Ticker 1", "Ticker 2"]
    ].apply(lambda x: f"{x['Ticker 1']}-{x['Ticker 2']}", axis=1).values[0]

    backtest_data.append(data)

    data.to_csv(f"./backtest_csv/{ticker}_backtest.csv", index=False)

if backtest_data:
    new_backtest_data_df = pd.concat(backtest_data, axis=0)
    # 모든 데이터를 하나의 DataFrame으로 병합
    backtest_data_df = pd.concat([backtest_data_df, new_backtest_data_df], axis=0)

    # 필요한 열만 선택 (보기 좋게 정리)
    backtest_data_df = backtest_data_df[["Pair", "Ticker", "Date", "Open", "High", "Low", "Close", "Adj Close", "Volume"]]

    # 백테스트 데이터를 테이블로 저장
    backtest_data_df.to_csv("./backtest_csv/backtest_data.csv", index=False)
    print("백테스트 데이터 저장 완료.")
else:
    print("추가 다운로드한 데이터가 없음")




기존 백테스트 데이터를 로드합니다.
HAL 데이터가 이미 존재합니다. 건너뜁니다.
GE 데이터가 이미 존재합니다. 건너뜁니다.
PSA 데이터가 이미 존재합니다. 건너뜁니다.
LVS 데이터가 이미 존재합니다. 건너뜁니다.
MO 데이터가 이미 존재합니다. 건너뜁니다.
ACGL 데이터가 이미 존재합니다. 건너뜁니다.
BMY 데이터가 이미 존재합니다. 건너뜁니다.
MRO 데이터가 이미 존재합니다. 건너뜁니다.
WBD 데이터가 이미 존재합니다. 건너뜁니다.
AIZ 데이터가 이미 존재합니다. 건너뜁니다.
ITW 데이터가 이미 존재합니다. 건너뜁니다.
HBAN 데이터가 이미 존재합니다. 건너뜁니다.
BEN 데이터가 이미 존재합니다. 건너뜁니다.
PFE 데이터가 이미 존재합니다. 건너뜁니다.
WYNN 데이터가 이미 존재합니다. 건너뜁니다.
DGX 데이터가 이미 존재합니다. 건너뜁니다.
CAH 데이터가 이미 존재합니다. 건너뜁니다.
WMB 데이터가 이미 존재합니다. 건너뜁니다.
ADP 데이터가 이미 존재합니다. 건너뜁니다.
FOXA 데이터가 이미 존재합니다. 건너뜁니다.
ETN 데이터가 이미 존재합니다. 건너뜁니다.
PTC 데이터가 이미 존재합니다. 건너뜁니다.
ABT 데이터가 이미 존재합니다. 건너뜁니다.
BRO 데이터가 이미 존재합니다. 건너뜁니다.
CTSH 데이터가 이미 존재합니다. 건너뜁니다.
EBAY 데이터가 이미 존재합니다. 건너뜁니다.
BAX 데이터가 이미 존재합니다. 건너뜁니다.
HWM 데이터가 이미 존재합니다. 건너뜁니다.
ZBRA 데이터가 이미 존재합니다. 건너뜁니다.
LKQ 데이터가 이미 존재합니다. 건너뜁니다.
LYB 데이터가 이미 존재합니다. 건너뜁니다.
JNPR 데이터가 이미 존재합니다. 건너뜁니다.
PARA 데이터가 이미 존재합니다. 건너뜁니다.
LUV 데이터가 이미 존재합니다. 건너뜁니다.
ELV 데이터가 이미 존재합니다. 건너뜁니다.
SYF 데이터가 이미 존재합니다. 건너뜁니다.
CTVA 데이터가 이미 존재합니다. 건너뜁니다.
DPZ 데이터가 이

## 안정적 페어 테이블과 백테스트 데이터 일치 여부 확인

In [56]:
# 안정적 페어와 백테스트 데이터의 페어 비교
stable_pairs_set = set(stable_pairs_df["Ticker 1"] + "-" + stable_pairs_df["Ticker 2"])
backtest_pairs_set = set(backtest_data_df["Pair"].unique())

print("일치 여부:", stable_pairs_set == backtest_pairs_set)
if stable_pairs_set != backtest_pairs_set:
    print("안정적 페어에만 있는 페어:", stable_pairs_set - backtest_pairs_set)
    print("백테스트에만 있는 페어:", backtest_pairs_set - stable_pairs_set)


일치 여부: True


## 데이터 누락 시 함수

In [57]:
import datetime
import time

def validate_data(file_path,start_date, end_date):
    if not os.path.exists(file_path):
        print(f"Data file {file_path} is missing.")
        return False
    try:
        #run_backtest에서 사용하는 데이터가 
        #실제 백테스트 기간(2024-01-01부터 2024-06-30)에 맞는지 확인
        df = pd.read_csv(file_path, parse_dates=['Date'])
        if df.empty:
            print(f"Data file {file_path} is empty.")
            return False
        # print(pd.to_datetime(df['Date']).min().timestamp(), time.mktime(datetime.datetime.strptime(start_date, "%Y-%m-%d").timetuple()))
        # if  pd.to_datetime(df['Date']).min().timestamp() > time.mktime(datetime.datetime.strptime(start_date, "%Y-%m-%d").timetuple()) or \
        #     pd.to_datetime(df['Date']).max().timestamp() < time.mktime(datetime.datetime.strptime(end_date, "%Y-%m-%d").timetuple()):
        #     print(f"Data file {file_path} does not cover the required date range {start_date} to {end_date}.")
        #     return False
    except Exception as e:
        print(f"Error reading {file_path}: {e}")
        return False
    return True


## 백테스트 전략 구현

In [63]:
from backtrader.plot import PlotScheme

# 사용자 정의 스타일 설정
class NoPopupPlotScheme(PlotScheme):
    def dofig(self, *args, **kwargs):
        kwargs['show'] = False  # 팝업 비활성화
        return super().dofig(*args, **kwargs)

# 폰트 및 그래프 설정
matplotlib.use('Agg')  # 백엔드 설정 (그래프 저장만 수행)
plt.rcParams["figure.figsize"] = [12, 8]  # 그래프 크기 조정
plt.rcParams["font.size"] = 10  # 폰트 크기
plt.rcParams["lines.linewidth"] = 1.5  # 선 두께
plt.rcParams["axes.unicode_minus"] = False  # 마이너스 기호 지원



class PairTradingStrategy(bt.Strategy):
    params = (
        ('lookback_period', 30),  # 이동 평균 및 표준편차 계산 기간
        ('zscore_threshold', 2),  # 매매 임계값 (z-score 기준)
    )

    def __init__(self):
        # 데이터 피드 설정 (두 종목만 사용)
        self.data0 = self.datas[0]
        self.data1 = self.datas[1]
        
        # 스프레드 계산
        self.spread = self.data0.close - self.data1.close

        # 이동 평균 및 표준편차 계산
        self.moving_avg = bt.indicators.SimpleMovingAverage(self.spread, period=self.params.lookback_period)
        self.moving_std = bt.indicators.StandardDeviation(self.spread, period=self.params.lookback_period)

        # 볼린저 밴드 계산
        self.upper_band = self.moving_avg + self.params.zscore_threshold * self.moving_std
        self.lower_band = self.moving_avg - self.params.zscore_threshold * self.moving_std

        self.trades = []

        self.open_positions = 0  # 열린 포지션 수
        self.total_closed_positions = 0  # 청산된 포지션 수

    def next(self):
        # 현재 스프레드 값
        current_spread = self.spread[0]

        # 포지션이 없는 경우
        if not self.position:
            if current_spread > self.upper_band[0]:
                # 상단 밴드를 초과하면 매도-매수
                self.sell(data=self.data0, size=10)  # 첫 번째 종목 매도
                self.buy(data=self.data1, size=10)  # 두 번째 종목 매수
                self.trades.append({'Action': 'Sell-Buy', 'Date': self.data0.datetime.date(0)})
                self.open_positions += 1
            elif current_spread < self.lower_band[0]:
                # 하단 밴드를 초과하면 매수-매도
                self.buy(data=self.data0, size=10)  # 첫 번째 종목 매수
                self.sell(data=self.data1, size=10)  # 두 번째 종목 매도
                self.trades.append({'Action': 'Buy-Sell', 'Date': self.data0.datetime.date(0)})
                self.open_positions += 1

        # 포지션이 있는 경우
        else:
            if self.upper_band[0] > current_spread > self.lower_band[0]:
                # 스프레드가 이동 평균으로 돌아오면 포지션 청산
                self.close(data=self.data0)
                self.close(data=self.data1)
                self.open_positions -= 1
                self.total_closed_positions += 1
                self.trades.append({'Action': 'Close', 'Date': self.data0.datetime.date(0)})


# Backtrader 설정 in data.to_csv(f"./backtest_csv/{ticker}_backtest.csv")

def run_backtest(pair) -> bool:
    ticker1, ticker2 = pair["Ticker 1"], pair["Ticker 2"]

    # 데이터 유효성 검사
    if not (validate_data(f"./backtest_csv/{ticker1}_backtest.csv", "2024-01-01", "2024-06-30") and validate_data(f"./backtest_csv/{ticker2}_backtest.csv", "2024-01-01", "2024-06-30")):
        print(f"Skipping {ticker1}-{ticker2} due to invalid data.")
        return False

    #각 페어에 대해 서로 다른 파일에서 데이터 로드하도록 수정
    data0 = bt.feeds.GenericCSVData(dataname=f"./backtest_csv/{ticker1}_backtest.csv", dtformat="%Y-%m-%d", openinterest=-1, name=ticker1)
    data1 = bt.feeds.GenericCSVData(dataname=f"./backtest_csv/{ticker2}_backtest.csv", dtformat="%Y-%m-%d", openinterest=-1, name=ticker2)

    cerebro = bt.Cerebro()
    cerebro.adddata(data0)
    cerebro.adddata(data1)
    cerebro.addstrategy(PairTradingStrategy)
    cerebro.addobserver(bt.observers.BuySell) # 매수/매도 표시
    cerebro.addobserver(bt.observers.Value)  # 포트폴리오 가치
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="trade_analyzer") #거래내역 정리

    strategy = cerebro.run()
    strategy_instance = strategy[0]

    # 거래 내역 가져오기
    trade_analyzer = strategy_instance.analyzers.trade_analyzer

    trade_data = {
        "Total Trades": trade_analyzer.rets.total.closed,
        "Win Ratio": trade_analyzer.rets.won.total / trade_analyzer.rets.total.closed if trade_analyzer.rets.total.closed > 0 else 0,
        'Net Profit': trade_analyzer.rets.pnl.net.total,
        'Net Profit Margin': trade_analyzer.rets.pnl.net.average,
        'MDD': trade_analyzer.rets.lost.pnl.max
    }

    # 거래내역저장
    trades_df = pd.DataFrame([trade_data])
    trades_df.to_csv(f"./trades_csv/{ticker1}-{ticker2}_trades.csv", index=False)

    fig = cerebro.plot(iplot=False, style='candle', scheme=NoPopupPlotScheme())[0][0]

    fig.savefig(f"./result/{ticker1}-{ticker2}_result.png", dpi=150, bbox_inches='tight')
    plt.close(fig)
    print(f"{ticker1}-{ticker2} 백테스트 완료")
    return True

# 병렬 처리로 모든 페어에 대해 백테스트 실행
if __name__ == "__main__":
    pairs = stable_pairs_df.to_dict('records')

    for p in pairs:
        
        if run_backtest(p):
            print(f"success start, {p['Ticker 1']} - {p['Ticker 2']}")
            
# TODO 페어별 매수,매도, 수익률 테이블출력 및 csv파일로 저장  


  self.mpyplot.show()


GE-HWM 백테스트 완료
success start, GE - HWM


  self.mpyplot.show()


CHRW-FDX 백테스트 완료
success start, CHRW - FDX


  self.mpyplot.show()


FICO-PTC 백테스트 완료
success start, FICO - PTC


  self.mpyplot.show()


BEN-IVZ 백테스트 완료
success start, BEN - IVZ


  self.mpyplot.show()


ABBV-GILD 백테스트 완료
success start, ABBV - GILD


  self.mpyplot.show()


FOXA-WBD 백테스트 완료
success start, FOXA - WBD


  self.mpyplot.show()


AMZN-EBAY 백테스트 완료
success start, AMZN - EBAY


  self.mpyplot.show()


LVS-WYNN 백테스트 완료
success start, LVS - WYNN


  self.mpyplot.show()


ANET-JNPR 백테스트 완료
success start, ANET - JNPR


  self.mpyplot.show()


COF-SYF 백테스트 완료
success start, COF - SYF


  self.mpyplot.show()


TGT-WMT 백테스트 완료
success start, TGT - WMT


  self.mpyplot.show()


GPC-LKQ 백테스트 완료
success start, GPC - LKQ


  self.mpyplot.show()


PNC-USB 백테스트 완료
success start, PNC - USB


  self.mpyplot.show()


ETR-PPL 백테스트 완료
success start, ETR - PPL


  self.mpyplot.show()


ETN-EMR 백테스트 완료
success start, ETN - EMR


  self.mpyplot.show()


KEYS-ZBRA 백테스트 완료
success start, KEYS - ZBRA


  self.mpyplot.show()


CTVA-FMC 백테스트 완료
success start, CTVA - FMC


  self.mpyplot.show()


MKTX-NDAQ 백테스트 완료
success start, MKTX - NDAQ


  self.mpyplot.show()


CAH-COR 백테스트 완료
success start, CAH - COR


  self.mpyplot.show()


ABT-BAX 백테스트 완료
success start, ABT - BAX


  self.mpyplot.show()


LH-DGX 백테스트 완료
success start, LH - DGX


  self.mpyplot.show()


DHI-LEN 백테스트 완료
success start, DHI - LEN


  self.mpyplot.show()


ADP-DAY 백테스트 완료
success start, ADP - DAY


  self.mpyplot.show()


ACN-CTSH 백테스트 완료
success start, ACN - CTSH


  self.mpyplot.show()


ITW-OTIS 백테스트 완료
success start, ITW - OTIS


  self.mpyplot.show()


BRO-MMC 백테스트 완료
success start, BRO - MMC


  self.mpyplot.show()


CVX-XOM 백테스트 완료
success start, CVX - XOM


  self.mpyplot.show()


GOOGL-META 백테스트 완료
success start, GOOGL - META


  self.mpyplot.show()


SCHW-MS 백테스트 완료
success start, SCHW - MS


  self.mpyplot.show()


CRL-DHR 백테스트 완료
success start, CRL - DHR


  self.mpyplot.show()


CNC-ELV 백테스트 완료
success start, CNC - ELV


  self.mpyplot.show()


PARA-DIS 백테스트 완료
success start, PARA - DIS


  self.mpyplot.show()


CNP-NI 백테스트 완료
success start, CNP - NI


  self.mpyplot.show()


AIZ-L 백테스트 완료
success start, AIZ - L


  self.mpyplot.show()


HAL-SLB 백테스트 완료
success start, HAL - SLB


  self.mpyplot.show()


EOG-MRO 백테스트 완료
success start, EOG - MRO


  self.mpyplot.show()


TRGP-WMB 백테스트 완료
success start, TRGP - WMB


  self.mpyplot.show()


TSCO-ULTA 백테스트 완료
success start, TSCO - ULTA


  self.mpyplot.show()


K-SJM 백테스트 완료
success start, K - SJM


  self.mpyplot.show()


LUV-UAL 백테스트 완료
success start, LUV - UAL


  self.mpyplot.show()


EL-KVUE 백테스트 완료
success start, EL - KVUE


  self.mpyplot.show()


BMY-PFE 백테스트 완료
success start, BMY - PFE


  self.mpyplot.show()


ACGL-CINF 백테스트 완료
success start, ACGL - CINF


  self.mpyplot.show()


CFG-HBAN 백테스트 완료
success start, CFG - HBAN


  self.mpyplot.show()


DPZ-SBUX 백테스트 완료
success start, DPZ - SBUX


  self.mpyplot.show()


EXR-PSA 백테스트 완료
success start, EXR - PSA


  self.mpyplot.show()


AMAT-LRCX 백테스트 완료
success start, AMAT - LRCX


  self.mpyplot.show()


INTC-MU 백테스트 완료
success start, INTC - MU


  self.mpyplot.show()


DD-LYB 백테스트 완료
success start, DD - LYB


  self.mpyplot.show()


MO-PM 백테스트 완료
success start, MO - PM


## SnP500종합주가지수

In [69]:

# S&P 500 주가 데이터 다운로드
def download_sp500_data():
    file_path = "./csv/sp500_data.csv"
     # 파일이 존재하면 다운로드를 생략하고, 존재하지 않으면 다운로드
    if os.path.exists(file_path):
        print("S&P 500 데이터가 이미 존재합니다. 다운로드를 생략합니다.")
    else:
        # 디렉토리 생성 (필요한 경우)

        sp500 = yf.download("^GSPC", start="2024-01-01", end="2024-06-30")
        sp500.reset_index(inplace=True)
        sp500 = sp500[["Date", "Close"]]
        sp500.to_csv("./csv/sp500_data.csv", index=False)
        print("S&P 500 데이터 저장 완료")

# 실행
download_sp500_data()

index_csv_path = "./csv/sp500_data.csv"

# 종합주가지수 데이터 불러오기
def get_index_return(index_csv_path, start_date, end_date):
    index_data = pd.read_csv(index_csv_path, parse_dates=["Date"])
    index_data = index_data[(index_data["Date"] >= start_date) & (index_data["Date"] <= end_date)]
    index_data["Return"] = (index_data["Close"] / index_data["Close"].iloc[0] - 1) * 100
    return index_data["Return"].iloc[-1]  # 기간 수익률 반환


# S&P 500의 수익률 가져오기
index_return = get_index_return(index_csv_path, "2024-01-01", "2024-06-30")
print(f"종합주가지수(S&P 500) 수익률: {index_return:.2f}%")

[*********************100%***********************]  1 of 1 completed

S&P 500 데이터 저장 완료
종합주가지수(S&P 500) 수익률: 15.13%





## 백테스트 결과 요약 및 분석

In [71]:
# 거래 비용 설정
TRADING_COST = 0.001  # 거래 수수료 0.1%
TRADING_TAX = 0.003  # 매도 시 증권거래세 0.3%

def save_trade_summary_with_analysis(index_csv_path):
    trade_results = []
    index_return = get_index_return(index_csv_path, "2024-01-01", "2024-06-30")
    
    trade_files = [f for f in os.listdir("./trades_csv/") if f.endswith("_trades.csv")]

    for file in trade_files:
        pair_name = file.replace("_trades.csv", "")
        trade_data = pd.read_csv(f"./trades_csv/{file}")
        trade_data["Pair"] = pair_name

        # 진입 금액 계산
        initial_investment = 100000  # 예시 초기 투자 금액

        # 수익 계산 (거래 비용 포함)
        trade_data["Net Profit After Costs"] = trade_data["Net Profit"] - (
            (trade_data["Net Profit"] * TRADING_COST) + (trade_data["Net Profit"] * TRADING_TAX)
        )
        trade_data["Entry Amount"] = initial_investment
        
        # 누적 수익률 및 기타 계산
        trade_data["Cumulative Return"] = (trade_data["Net Profit"] / initial_investment) * 100
        max_return = trade_data["Cumulative Return"].max()
        min_return = trade_data["Cumulative Return"].min()

        # 거래 비용 포함된 초과 수익률 계산
        cumulative_return = trade_data["Cumulative Return"].sum()
        excess_return = cumulative_return - index_return

        trade_results.append({
            "Pair": pair_name,
            "Total Trades": trade_data["Total Trades"].sum(),
            "Win Ratio": trade_data["Win Ratio"].mean(),
            "Cumulative Return": trade_data["Cumulative Return"].sum(),
            "Excess Return": excess_return,
            "Max Return": max_return,
            "Min Return": min_return,
            "Market Return": index_return,
        })

    all_trades_df = pd.DataFrame(trade_results)
    all_trades_df.to_csv("./trades_csv/summary_with_analysis.csv", index=False)
    print(all_trades_df)


## 예시문제-유튜브

In [59]:
!pip install backtesting
!pip install ta #기술지표 계산 처리하는 라이브러리
!pip install yfinance #금융상품 가격 데이터 가져오는 라이브러리

import ta
from backtesting import Backtest, Strategy
from backtesting.lib import crossover

#전략 정의하는 클래스
class SMAcross(Strategy):

    n1=50#단기기간
    n2=100#장기기간

    def init(self):
        close =  self.data.Close
        self.sma1 = self.I(ta.trend.sma_indicator, pd.Series(close), self.n1)
        self.sma2 = self.I(ta.trend.sma_indicator, pd.Series(close), self.n2)

    def next(self):
        if crossover(self.sma1, self.sma2):
            self.but()
        elif crossover(self.sma2,self.sma1):
            self.sell()
 
df = yf.download('BTC-USD',start='2018-01-01')

bt = Backtest(df, SMAcross, cash=100000, commission=0.002, exclusive_orders=True)

#최적화
optim = bt.optimize(n1 = range(50,160,10),
                    n2 = range(50,160,10),
                    constraint = lambda x: x.n2-x.n1 > 20,
                    maximize = 'Return [%]')

bt.plot()









Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


ERROR: Invalid requirement: '#기술지표': Expected package name at the start of dependency specifier
    #기술지표
    ^


Defaulting to user installation because normal site-packages is not writeable


ERROR: Invalid requirement: '#금융상품': Expected package name at the start of dependency specifier
    #금융상품
    ^


ModuleNotFoundError: No module named 'ta'