<a href="https://colab.research.google.com/github/GermanM3/GermanM3/blob/master/%EC%86%8C%ED%98%95%EC%A3%BC_%EB%A7%88%EB%B2%95%EA%B3%B5%EC%8B%9D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pykrx
pip install finance-datareader

In [None]:
from pykrx import stock
import FinanceDataReader as fdr

import pandas as pd
import numpy as np
import datetime
import time
import random
from concurrent.futures import ThreadPoolExecutor

In [None]:
from google.colab import files

# 파일 업로드
uploaded = files.upload()
# 파일명 확인 (업로드한 파일명을 딕셔너리로 반환)
file_name = list(uploaded.keys())[0]

# 엑셀 파일 읽기
df = pd.read_excel(file_name)
df['Code']=df['Code'].astype(str).str.zfill(6)

In [None]:
codes=df['Code'].tolist()
start_date = datetime.date(2014, 1, 1)
end_date = datetime.date(2024, 9, 30)

In [None]:
import time
import random
from concurrent.futures import ThreadPoolExecutor

def fetch_monthly_close(code):
    time.sleep(random.uniform(0.1, 0.3))  # 짧은 랜덤 지연으로 API 과부하 방지
    df1 = fdr.DataReader(code, start_date, end_date)
    monthly_close = df1['Close'].resample('ME').last()  # 월말 종가만 가져오기
    return code, monthly_close

stock_data = {}
with ThreadPoolExecutor(max_workers=10) as executor:  # max_workers를 최적화된 값으로 조정
    results = executor.map(fetch_monthly_close, codes)
    for code, monthly_close in results:  # 결과 수집
        stock_data[code] = monthly_close

# 모든 종목의 월말 종가 데이터를 하나의 DataFrame으로 결합
mret = pd.DataFrame(stock_data)

In [None]:
start_date = "20140101"
end_date = "20240930"

def fetch_fundamental_data(code):
    time.sleep(random.uniform(0.05, 0.15))  # 랜덤 딜레이 범위를 좁혀 호출 속도 향상
    try:
        data = stock.get_market_fundamental(start_date, end_date, code, freq="m")
        data['code'] = code  # 종목 코드 추가
        return data
    except Exception as e:
        print(f"Error fetching data for {code}: {e}")
        return None  # 오류 발생 시 None 반환

results = []
with ThreadPoolExecutor(max_workers=20) as executor:  # max_workers 조정으로 속도 향상
    futures = {executor.submit(fetch_fundamental_data, code): code for code in codes}
    for future in futures:
        result = future.result()
        if result is not None:  # 유효한 데이터만 추가
            results.append(result)


In [None]:
result_df = pd.concat(results, ignore_index=False)
result_df.index.names = ['Date']  # Set the primary index name directly
result_df.set_index('code', append=True, inplace=True)  # Add 'code' as a second level to the index
result_df['ROE']=result_df['EPS']/result_df['BPS']


In [25]:
price_data= mret.set_index('Date')

In [45]:
# 자본수익률(ROC)과 이익수익률 계산
df['ROC'] = df['ROE']  # ROE를 그대로 사용
df['Earnings_Yield'] = 1 / df['PER']  # PER의 역수

# 종목별로 그룹화하여 각 월별 ROC와 이익수익률의 순위 계산
df['ROC_Rank'] = df.groupby('Date')['ROC'].rank(ascending=False)
df['EY_Rank'] = df.groupby('Date')['Earnings_Yield'].rank(ascending=False)

# 마법공식 순위: ROC와 이익수익률 순위의 합
df['Magic_Formula_Rank'] = df['ROC_Rank'] + df['EY_Rank']

# 각 월별로 상위 n개 종목 선택 (예: n = 5)
n = 5
top_n_stocks = df.groupby('Date').apply(lambda x: x.nsmallest(n, 'Magic_Formula_Rank')).reset_index(drop=True)

print(top_n_stocks[['Date', 'code', 'Magic_Formula_Rank']])

          Date    code  Magic_Formula_Rank
0   2014-01-31  005320               108.0
1   2014-01-31  037350               119.0
2   2014-01-31  079650               119.0
3   2014-01-31  039290               120.0
4   2014-01-31  153460               140.0
..         ...     ...                 ...
640 2024-09-30  154040               180.0
641 2024-09-30  042940               181.0
642 2024-09-30  092460               184.0
643 2024-09-30  085310               184.0
644 2024-09-30  019540               195.0

[645 rows x 3 columns]


  top_n_stocks = df.groupby('Date').apply(lambda x: x.nsmallest(n, 'Magic_Formula_Rank')).reset_index(drop=True)


In [50]:
# 각 월별로 상위 n개 종목 선택 (예: n = 5)
n = 3
top_n_stocks = df.groupby('Date').apply(lambda x: x.nsmallest(n, 'Magic_Formula_Rank')).reset_index(drop=True)

def backtest(price_data, top_n_stocks, n=3):
    # 월별 수익률 저장 변수 초기화
    monthly_returns = []

    # 월별로 반복하여 상위 n개의 종목에 대한 수익률 계산
    for date in top_n_stocks['Date'].unique():
        # 해당 월의 상위 n 종목 가져오기
        top_stocks = top_n_stocks[top_n_stocks['Date'] == date]['code'].values

        # 상위 종목의 가격 데이터 필터링
        if len(top_stocks) == 0:
            continue

        # 현재 월 가격
        try:
            prices = price_data.loc[date, top_stocks]
        except KeyError:
            print(f"가격 데이터가 존재하지 않음: {date}에 대한 {top_stocks}")
            continue

        # 수익률 계산을 위한 직전 월 가격 데이터
        prev_date = price_data.index[price_data.index < date].max()
        if pd.isna(prev_date):  # NaT가 아닌 경우만 진행
            print(f"유효한 이전 날짜가 없습니다: {date}")
            continue

        try:
            prev_prices = price_data.loc[prev_date, top_stocks]
            # 수익률 계산 (현재 월 / 이전 월 - 1)
            returns = (prices / prev_prices - 1).mean()
            monthly_returns.append((date, returns))
        except KeyError:
            print(f"이전 가격 데이터가 존재하지 않음: {prev_date}에 대한 {top_stocks}")
            continue

    # 결과 데이터프레임 생성
    monthly_returns_df = pd.DataFrame(monthly_returns, columns=['Date', 'Monthly_Return'])
    monthly_returns_df.set_index('Date', inplace=True)

    # 누적 수익률 계산
    cumulative_return = (1 + monthly_returns_df['Monthly_Return']).cumprod() - 1

    return monthly_returns_df, cumulative_return

# 백테스팅 실행
monthly_returns_df, cumulative_return = backtest(price_data, top_n_stocks, n=3)

  top_n_stocks = df.groupby('Date').apply(lambda x: x.nsmallest(n, 'Magic_Formula_Rank')).reset_index(drop=True)


유효한 이전 날짜가 없습니다: 2014-01-31 00:00:00
