In [45]:
import ccxt.pro as ccxtpro
import asyncio
import nest_asyncio
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import statsmodels.api as sm
import os
import warnings

warnings.filterwarnings('ignore')

nest_asyncio.apply()

if not os.path.exists(f'binance_futures.csv'):
    # API 키 로드
    with open('binance_api.txt', 'r') as file:
        api_key = file.readline().strip()
        secret = file.readline().strip()
    
    # 거래소 설정
    exchange = ccxtpro.binance({
        'apiKey': api_key,
        'secret': secret,
        'enableRateLimit': True,
        'options': {'defaultType': 'future'}
    })
    
    async def fetch_symbol_data(exchange, symbol, since):
        """
        특정 심볼의 OHLCV 데이터를 가져오고 날짜 인덱스로 설정합니다.
    
        입력값:
        - exchange: ccxtpro 거래소 객체
        - symbol: 심볼 문자열
        - since: 데이터 시작 시점 (timestamp)
    
        출력값:
        - (symbol, df): 심볼과 해당 심볼의 데이터프레임 튜플
        """
        try:
            all_ohlcv = []
            while since < exchange.milliseconds():
                ohlcv = await exchange.fetch_ohlcv(symbol, '1d', since=since, limit=500)
                if not ohlcv:
                    break
                all_ohlcv += ohlcv
                since = ohlcv[-1][0] + 1  # 마지막 timestamp를 업데이트하여 다음 batch의 데이터를 가져옴
    
            # 데이터프레임 생성
            df = pd.DataFrame(all_ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
            df['quote_volume'] = df['volume'] * df['close']  # 일 단위로 거래량을 USDT로 환산
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
            df.set_index('timestamp', inplace=True)
    
            return symbol, df
        except Exception as e:
            print(f"Error fetching data for {symbol}: {e}")
            return symbol, None
    
    async def get_top_symbols_by_volume(exchange, n=40):
        """
        상위 n개의 심볼을 2021-10-27부터 현재까지의 거래량 기준으로 선택합니다.
        충족하는 심볼이 n개 이하일 경우에도 가능한 데이터를 반환합니다.
    
        입력값:
        - exchange: ccxtpro 거래소 객체
        - n: 선택할 심볼의 개수
    
        출력값:
        - selected_symbols: 상위 n개의 (symbol, df) 튜플 리스트
        """
        await exchange.load_markets()
        symbols = [symbol for symbol in exchange.markets if exchange.markets[symbol]['active'] and '/USDT' in symbol]
    
        # 기준 기간: 2021-10-27부터 현재까지
        volume_start_date = pd.Timestamp('2021-10-27').normalize()
        volume_start_timestamp = exchange.parse8601(volume_start_date.strftime('%Y-%m-%dT%H:%M:%SZ'))
    
        # 데이터 수집 기간: 5년 전부터 현재까지
        data_collection_start_date = datetime.now() - timedelta(days=5*365)
        data_collection_start_timestamp = exchange.parse8601(data_collection_start_date.strftime('%Y-%m-%dT%H:%M:%SZ'))
    
        # 기준 기간 동안의 거래량 합산을 위한 데이터 수집
        tasks = [fetch_symbol_data(exchange, symbol, volume_start_timestamp) for symbol in symbols]
        symbol_data = await asyncio.gather(*tasks)
    
        # 중복 제거 및 데이터 유효성 확인
        unique_symbol_data = {}
        for symbol, df in symbol_data:
            if df is not None:
                clean_symbol = symbol.split(':')[0]
                perp_symbol = clean_symbol + ':USDT'
                if perp_symbol not in unique_symbol_data:
                    # 기준 기간 동안의 거래량 합산
                    volume_sum = df.loc[df.index >= volume_start_date, 'quote_volume'].sum()
                    unique_symbol_data[perp_symbol] = (df, volume_sum)
    
        # 거래량이 상위 n개인 심볼을 선택
        sorted_symbols = sorted(unique_symbol_data.items(), key=lambda x: x[1][1], reverse=True)
    
        # 가능한 심볼 선택 (최대 n개)
        selected_symbols = []
        idx = 0
        while idx < len(sorted_symbols):
            symbol, (df, _) = sorted_symbols[idx]
            if df.index.min() <= volume_start_date:
                selected_symbols.append((symbol, df))
            idx += 1
            if len(selected_symbols) >= n:  # 최대 n개까지만 선택
                break
    
        # 선택된 심볼이 없을 경우 경고 메시지 출력
        if len(selected_symbols) == 0:
            print("No symbols found with sufficient data starting from 2021-10-27.")
    
        # 데이터를 포함한 심볼을 최초 데이터 입력일자(최소 날짜) 기준으로 추가 정렬
        selected_symbols = sorted(selected_symbols, key=lambda x: x[1].index.min())
    
        return selected_symbols
    
    async def merge_dataframes(exchange, n=40):
        """
        상위 n개의 데이터를 병합하여 하나의 데이터프레임으로 반환합니다.
    
        입력값:
        - exchange: ccxtpro 거래소 객체
        - n: 선택할 심볼의 개수
    
        출력값:
        - 병합된 데이터프레임
        """
        top_symbols_data = await get_top_symbols_by_volume(exchange, n)
        
        if len(top_symbols_data) == 0:
            print("No valid symbols available to merge.")
            return None  # 데이터가 없는 경우 None 반환
    
        # 날짜 인덱스를 기준으로 수평 병합
        merged_df = None
        for symbol, df in top_symbols_data:
            # 컬럼을 멀티 레벨로 설정
            df.columns = pd.MultiIndex.from_product([[symbol], df.columns])
            
            if merged_df is None:
                merged_df = df
            else:
                merged_df = merged_df.join(df, how='outer')
    
        return merged_df
    
    # 병합된 데이터 준비
    merged_df = asyncio.run(merge_dataframes(exchange, 40))
    
    # 병합된 데이터프레임이 None이 아닌지 확인 후 저장
    if merged_df is not None:
        merged_df.to_csv('binance_futures.csv')
        print("Data successfully saved to 'binance_futures.csv'")
    else:
        print("No data to save, merged_df is None")
else:
    # CSV 파일 로드 시 인덱스를 DatetimeIndex로 설정
    merged_df = pd.read_csv('binance_futures.csv', header=[0, 1], index_col=0, parse_dates=True)
    print("응 이미있어.")

응 이미있어.


In [46]:
# merged_df의 복사본 생성
daily_df = merged_df.copy()

# 각 심볼에 대한 일별 수익률 및 누적 수익률 계산
for symbol in daily_df.columns.levels[0]:
    daily_df[(symbol, 'returns')] = daily_df[(symbol, 'close')].pct_change()
    daily_df[(symbol, 'cumulative_returns')] = (1 + daily_df[(symbol, 'returns')]).cumprod() - 1

# 월별 데이터 계산
monthly_df = daily_df.resample('M').last()

# 각 심볼에 대한 월별 수익률 계산
for symbol in daily_df.columns.levels[0]:
    monthly_df[(symbol, 'monthly_returns')] = monthly_df[(symbol, 'close')].pct_change()

# 일별 데이터에 월별 수익률 추가 (월 마지막 날에만 값이 존재)
for symbol in daily_df.columns.levels[0]:
    daily_df[(symbol, 'monthly_returns')] = monthly_df[(symbol, 'monthly_returns')].reindex(daily_df.index, method='ffill')


In [47]:
# (재필) 크립토명-종가/수익률 등등 순서대로 나오도록 정렬

monthly_df.sort_index(axis=1, inplace=True)
daily_df.sort_index(axis=1, inplace=True)

In [48]:

# Prospect Theory 관련 함수 정의
def prospect_value(r):
    return np.where(r >= 0, r ** 0.88, -2.25 * (-r) ** 0.88)

def weight_function(n, i):
    return (n - i + 1) / (n * (n + 1) / 2)

# TK 값 계산을 위한 창 설정
daily_window = 150
monthly_window = 12

# 각 심볼에 대한 TK 값 및 기타 지표 계산
for symbol in daily_df.columns.levels[0]:
    # 일별 수익률 및 초과 수익률 계산
    # (재필) excess return을 daily return에서 cross sectional equal weight mean return을 빼주려고 한거같은데
    # 이렇게 하면 한 종목의 return의 전 기간 평균을 빼주게 되는거라 맞지 않는 듯 함. 
    daily_df[(symbol, 'excess_returns')] = daily_df[(symbol, 'returns')] - daily_df[(symbol, 'returns')].mean() 

    # TK 값 계산 (rolling.apply 사용)
    def compute_tk(past_returns):
        if np.isnan(past_returns).any():
            return np.nan
        sorted_returns = np.sort(past_returns)
        pv = prospect_value(sorted_returns)
        weights = np.array([weight_function(daily_window, i) for i in range(1, daily_window + 1)])
        return np.dot(pv, weights)

    daily_df[(symbol, 'tk_value')] = daily_df[(symbol, 'excess_returns')].rolling(window=daily_window).apply(compute_tk, raw=True)

    # Overhang 값 계산
    if (symbol, 'quote_volume') not in daily_df.columns:
        daily_df[(symbol, 'quote_volume')] = daily_df[(symbol, 'volume')] * daily_df[(symbol, 'close')]

    # Turnover 계산 (0으로 나누는 경우 방지)
    rolling_quote_volume_sum = daily_df[(symbol, 'quote_volume')].rolling(window=daily_window).sum()
    daily_df[(symbol, 'turnover')] = daily_df[(symbol, 'quote_volume')] / rolling_quote_volume_sum.replace(0, np.nan)

    # 가중 평균 Turnover 계산
    weights = np.array([(1 - 0.3) ** i for i in range(daily_window)])
    weights /= weights.sum()

    def weighted_turnover_func(x):
        if np.isnan(x).any():
            return np.nan
        return np.dot(x, weights)

    daily_df[(symbol, 'weighted_turnover')] = daily_df[(symbol, 'turnover')].rolling(window=daily_window).apply(weighted_turnover_func, raw=True)

    # 가중 평균 Turnover를 이용한 가중 가격 및 잠정 수익률 계산
    daily_df[(symbol, 'weighted_price')] = daily_df[(symbol, 'weighted_turnover')] * daily_df[(symbol, 'close')]
    daily_df[(symbol, 'potential_return')] = daily_df[(symbol, 'close')] / daily_df[(symbol, 'weighted_price')] - 1

# 멀티 인덱스 정렬
daily_df = daily_df.sort_index(axis=1, level=[0, 1])

In [49]:
# 일별 수익률, 선행 수익률, 로그 총수익률 계산 (중복 제거)
for symbol in daily_df.columns.levels[0]:
    daily_df[(symbol, 'shifted_returns')] = daily_df[(symbol, 'returns')].shift(1)
    daily_df[(symbol, 'log_gross_returns')] = np.log(1 + daily_df[(symbol, 'shifted_returns')])

# 월별 로그 수익률 계산
# 'log_gross_returns'는 각 심볼별로 존재하므로, 이를 크로스 섹션으로 선택
monthly_log_returns = daily_df.xs('log_gross_returns', level=1, axis=1).resample('M').sum()

# 무위험 수익률(Risk-Free Rate) 계산: 해당 월의 모든 종목의 평균 로그 수익률
monthly_log_returns['risk_free_rate'] = monthly_log_returns.mean(axis=1)

# 각 종목의 초과수익률(Excess Log Returns) 계산
for symbol in daily_df.columns.levels[0]:
    if symbol in monthly_log_returns.columns:
        monthly_log_returns[f'{symbol}_excess_log_returns'] = monthly_log_returns[symbol] - monthly_log_returns['risk_free_rate']
    else:
        print(f"Warning: {symbol} not found in monthly_log_returns.columns")

# 팩터 값을 저장하기 위한 새로운 DataFrame 생성
factor_df = pd.DataFrame(index=daily_df.index)

### 1. 모멘텀 팩터 계산 ###
for symbol in monthly_df.columns.levels[0]:
    # 12개월 모멘텀 계산
    momentum = (monthly_df[(symbol, 'close')].shift(1) / monthly_df[(symbol, 'close')].shift(monthly_window)) - 1
    # 일 단위로 확장
    factor_df[(symbol, 'momentum')] = momentum.reindex(daily_df.index, method='ffill')

### 2. 시계열 모멘텀 팩터 계산 ###
for symbol in daily_df.columns.levels[0]:
    excess_log_col = f'{symbol}_excess_log_returns'
    if excess_log_col in monthly_log_returns.columns:
        # 12개월 누적 초과 로그 수익률 계산
        ts_momentum = monthly_log_returns[excess_log_col].rolling(window=monthly_window).sum().shift(1)
        # 일 단위로 확장
        factor_df[(symbol, 'ts_momentum')] = ts_momentum.reindex(daily_df.index, method='ffill')
    else:
        print(f"Warning: {excess_log_col} not found in monthly_log_returns.columns")
        factor_df[(symbol, 'ts_momentum')] = np.nan

### 3. 52주 고가 대비 비율 계산 ###
for symbol in monthly_df.columns.levels[0]:
    # 52주(12개월) 고가 계산
    high_52_week = monthly_df[(symbol, 'high')].rolling(window=monthly_window).max()
    # 비율 계산
    ratio_to_high = monthly_df[(symbol, 'close')] / high_52_week
    # 일 단위로 확장
    factor_df[(symbol, 'ratio_to_high')] = ratio_to_high.reindex(daily_df.index, method='ffill')

### 4. TK 팩터 계산 ###
for symbol in monthly_df.columns.levels[0]:
    # 12개월 TK 값 계산
    excess_log_col = f'{symbol}_excess_log_returns'
    if excess_log_col in monthly_log_returns.columns:
        def compute_monthly_tk(past_returns):
            if np.isnan(past_returns).any():
                return np.nan
            sorted_returns = np.sort(past_returns)
            pv = prospect_value(sorted_returns)
            weights = np.array([weight_function(monthly_window, i) for i in range(1, monthly_window + 1)])
            return np.dot(pv, weights)
        
        monthly_df[(symbol, 'tk_value')] = monthly_log_returns[excess_log_col].rolling(window=monthly_window).apply(compute_monthly_tk, raw=True)
        # 일 단위로 확장
        factor_df[(symbol, 'tk_value')] = monthly_df[(symbol, 'tk_value')].reindex(daily_df.index, method='ffill')
    else:
        print(f"Warning: {excess_log_col} not found in monthly_log_returns.columns")
        factor_df[(symbol, 'tk_value')] = np.nan

### 5. Overhang 팩터 계산 ###
for symbol in monthly_df.columns.levels[0]:
    # 잠재적 수익률은 이미 계산되어 있으므로 이를 팩터로 추가
    factor_df[(symbol, 'potential_return')] = daily_df[(symbol, 'potential_return')]

### 6. Maxing Out 팩터 계산 ###
for symbol in daily_df.columns.levels[0]:
    # 지난 한 달(30일) 동안의 최대 일일 수익률 계산
    max_return = daily_df[(symbol, 'returns')].rolling(window=30).max()
    factor_df[(symbol, 'max_return')] = max_return

### 7. Salience 팩터 계산 ###
theta = 0.88
delta = 0.65
# 시장 수익률 계산: 모든 종목의 평균 수익률
market_returns = daily_df.xs('returns', level=1, axis=1).mean(axis=1)

def calculate_salience(r, m_r, theta, delta):
    return (np.abs(r - m_r) ** theta) / (np.abs(r) ** theta + delta * np.abs(m_r) ** theta)

for symbol in daily_df.columns.levels[0]:
    # Salience 계산
    salience_values = calculate_salience(daily_df[(symbol, 'returns')], market_returns, theta, delta)
    factor_df[(symbol, 'salience')] = salience_values

# 팩터 데이터프레임을 멀티 인덱스로 정렬
factor_df = factor_df.sort_index(axis=1, level=[0, 1])
factor_df.to_csv('factor.csv')

In [50]:
import numpy as np
import pandas as pd
import plotly.express as px
import os

# ---------------------------
# 1. 데이터 준비
# ---------------------------

# daily_df와 factor_df는 이미 계산되어 있다고 가정합니다.
# daily_df: MultiIndex 컬럼 구조 (symbol, attribute), 예: ('BTC/USDT', 'close')
# factor_df: MultiIndex 컬럼 구조 (symbol, factor), 예: ('BTC/USDT', 'momentum')

# 예시 데이터 확인 (실제 데이터로 대체하세요)
# print(daily_df.head())
# print(factor_df.head())

# ---------------------------
# 2. 멀티 인덱스 컬럼 설정
# ---------------------------

# factor_df 컬럼이 튜플 형태로 되어있는지 확인하고, 멀티 인덱스로 변환
if not isinstance(factor_df.columns, pd.MultiIndex):
    try:
        # 컬럼 이름이 문자열이라면 튜플 형태로 변환
        factor_df.columns = pd.MultiIndex.from_tuples(
            [(col,) if isinstance(col, str) else col for col in factor_df.columns]
        )
    except Exception as e:
        print("factor_df.columns을 MultiIndex로 변환하는 중 오류 발생:", e)

# daily_df 컬럼이 튜플 형태로 되어있는지 확인하고, 멀티 인덱스로 변환
if not isinstance(daily_df.columns, pd.MultiIndex):
    try:
        # 컬럼 이름이 문자열이라면 튜플 형태로 변환
        daily_df.columns = pd.MultiIndex.from_tuples(
            [(col,) if isinstance(col, str) else col for col in daily_df.columns]
        )
    except Exception as e:
        print("daily_df.columns을 MultiIndex로 변환하는 중 오류 발생:", e)

# ---------------------------
# 3. 벤치마크 수익률 계산
# ---------------------------

# 일별 수익률이 이미 'returns' 열에 계산되어 있다고 가정
# 월별 누적 수익률 계산: (1 + 일별 수익률).resample('M').apply(np.prod) - 1
monthly_returns = (1 + daily_df.xs('returns', level=1, axis=1)).resample('M').apply(np.prod) - 1

# 벤치마크: 모든 종목의 월별 누적 수익률의 평균 (동일가중)
benchmark_returns = monthly_returns.mean(axis=1)

# 벤치마크 수익률 확인
print("Benchmark Returns:")
print(benchmark_returns.head())

# ---------------------------
# 4. 백테스팅 전략 설정 및 수행
# ---------------------------

# 전략 설정
factors = ['momentum', 'ts_momentum', 'ratio_to_high', 'tk_value', 'potential_return', 'max_return', 'salience']
percentiles = [10, 20, 30]  # 추가된 30% 퍼센타일
weighting_methods = ['equal_weighted', 'value_weighted']

# 백테스트 결과를 저장할 딕셔너리 초기화
performance_results = {}

# 백테스트 로그를 저장할 딕셔너리 초기화
# 각 전략별로 DataFrame을 저장하여 날짜, longs, shorts, returns 기록
strategy_logs = {f"{factor}_{percentile}pct_{weight}": pd.DataFrame(columns=['Date', 'Longs', 'Shorts', 'Return'])
                for factor in factors for percentile in percentiles for weight in weighting_methods}

# 기본 모멘텀 전략 관련 추가 전략도 포함
basic_momentum_strategies = [
    ('entry10pct_retain20pct', 10, 20),
    ('entry10pct_no_retain', 10, 0)
]
for suffix, entry_pct, retain_pct in basic_momentum_strategies:
    for weight in weighting_methods:
        strategy_id = f"momentum_{suffix}_{weight}"  # e.g., 'momentum_entry10pct_retain20pct_equal_weighted'
        strategy_logs[strategy_id] = pd.DataFrame(columns=['Date', 'Longs', 'Shorts', 'Return'])

# 시계열 모멘텀 및 추가적인 전략도 포함
for percentile in [10, 20]:
    for weight in weighting_methods:
        strategy_id = f"ts_momentum_directional_{percentile}pct_{weight}"
        strategy_logs[strategy_id] = pd.DataFrame(columns=['Date', 'Longs', 'Shorts', 'Return'])

# 추가적인 횡단면적 전략
for factor in factors:
    for percentile in [10, 20, 30]:
        for weight in weighting_methods:
            strategy_id = f"{factor}_{percentile}pct_{weight}"
            strategy_logs[strategy_id] = pd.DataFrame(columns=['Date', 'Longs', 'Shorts', 'Return'])

# 월말 기준으로 팩터 데이터 리샘플링
monthly_factors = factor_df.resample('M').last()

# 투자 비율 설정 (100%)
investment_rate = 0.2  # 100%로 설정

# 전체 종목 리스트
symbols = factor_df.columns.levels[0]

# ---------------------------
# 4.1. 일반적인 퍼센타일 기반 포트폴리오 구성
# ---------------------------

for factor in factors:
    for percentile in percentiles:
        for weight in weighting_methods:
            # 전략 식별자 생성
            strategy_id = f"{factor}_{percentile}pct_{weight}"
            
            # 현재 팩터의 값 추출
            try:
                factor_values = factor_df.xs(factor, level=1, axis=1)
            except KeyError:
                print(f"팩터 '{factor}'가 factor_df에 존재하지 않습니다.")
                continue
            
            # 상위 및 하위 퍼센타일 기준 임계값 계산
            top_threshold = factor_values.quantile(1 - percentile / 100, axis=1)
            bottom_threshold = factor_values.quantile(percentile / 100, axis=1)
            
            # 포트폴리오 가중치 초기화
            portfolios = pd.DataFrame(0, index=monthly_factors.index, columns=factor_values.columns)
            
            # 각 월별로 상위/하위 종목 선택 및 가중치 할당
            for date in monthly_factors.index:
                if date not in factor_values.index:
                    continue
                current_factors = factor_values.loc[date]
                
                # 상위 퍼센타일 종목
                longs = current_factors[current_factors >= top_threshold.loc[date]].index.tolist()
                # 하위 퍼센타일 종목
                shorts = current_factors[current_factors <= bottom_threshold.loc[date]].index.tolist()
                
                # 1개월 후의 수익률 계산
                try:
                    current_pos = monthly_returns.index.get_loc(date)
                    next_month = monthly_returns.index[current_pos + 1]
                    # 롱 포지션 수익률: 각 롱 종목의 다음 달 수익률 평균
                    long_return = monthly_returns.loc[next_month, longs].mean() if len(longs) > 0 else 0
                    # 숏 포지션 수익률: 각 숏 종목의 다음 달 수익률 평균
                    short_return = monthly_returns.loc[next_month, shorts].mean() if len(shorts) > 0 else 0
                    # 전체 포트폴리오 수익률: 롱 수익률 - 숏 수익률
                    portfolio_return = long_return - short_return
                except (IndexError, KeyError):
                    # 다음 달 데이터가 없는 경우 NaN으로 설정
                    portfolio_return = np.nan
                
                # 포트폴리오 수익률 로그 저장
                new_log = pd.DataFrame([{
                    'Date': date,
                    'Longs': ','.join(longs),    # 롱 종목을 쉼표로 구분한 문자열로 저장
                    'Shorts': ','.join(shorts),  # 숏 종목을 쉼표로 구분한 문자열로 저장
                    'Return': portfolio_return
                }])
                strategy_logs[strategy_id] = pd.concat([strategy_logs[strategy_id], new_log], ignore_index=True)
                
                # 가중치 할당
                if weight == 'equal_weighted':
                    if len(longs) > 0:
                        portfolios.loc[date, longs] += 1 / len(longs)
                    if len(shorts) > 0:
                        portfolios.loc[date, shorts] -= 1 / len(shorts)
                elif weight == 'value_weighted':
                    if len(longs) > 0:
                        try:
                            # 팩터 값을 기준으로 가중치 할당
                            long_factors = current_factors.loc[list(longs)]
                            long_weights = long_factors / long_factors.sum()
                            portfolios.loc[date, longs] += long_weights
                        except KeyError:
                            print(f"롱 포지션 종목의 팩터 데이터가 없습니다: {longs}")
                    if len(shorts) > 0:
                        try:
                            # 팩터 값을 기준으로 가중치 할당
                            short_factors = current_factors.loc[list(shorts)]
                            short_weights = short_factors / short_factors.sum()
                            portfolios.loc[date, shorts] -= short_weights
                        except KeyError:
                            print(f"숏 포지션 종목의 팩터 데이터가 없습니다: {shorts}")
            
            # 투자 비율 제한 (100%)
            portfolios = portfolios * investment_rate
            
            # 포트폴리오 수익률 계산
            # 포트폴리오 가중치를 이전 월의 포트폴리오로 시프트하여 리밸런싱 시점 반영
            aligned_weights = portfolios.shift(1).reindex(monthly_returns.index).fillna(0)
            
            # 포트폴리오의 월별 수익률 계산 (팩터 값을 기준으로 가중치 할당)
            portfolio_monthly_returns = (aligned_weights * monthly_returns).sum(axis=1)
            
            # 초과 수익률 계산 (포트폴리오 수익률 - 벤치마크 수익률)
            excess_returns = portfolio_monthly_returns - benchmark_returns
            
            # 백테스트 결과 저장
            performance_results[strategy_id] = pd.DataFrame({
                'Portfolio_Return': portfolio_monthly_returns,
                'Excess_Return': excess_returns
            })

# ---------------------------
# 4.2. 기본 모멘텀 전략: 진입조건 10%, 보유조건 20%
# ---------------------------

# 팩터 및 퍼센타일 설정
basic_momentum_factor = 'momentum'
entry_percentile = 10
retain_percentile = 20

for weight in weighting_methods:
    # 전략 식별자 생성
    strategy_id = f"{basic_momentum_factor}_entry{entry_percentile}pct_retain{retain_percentile}pct_{weight}"
    
    # 현재 팩터의 값 추출
    try:
        factor_values = factor_df.xs(basic_momentum_factor, level=1, axis=1)
    except KeyError:
        print(f"팩터 '{basic_momentum_factor}'가 factor_df에 존재하지 않습니다.")
        continue
    
    # 상위 및 하위 퍼센타일 기준 임계값 계산 (진입조건 10%)
    top_threshold = factor_values.quantile(1 - entry_percentile / 100, axis=1)
    bottom_threshold = factor_values.quantile(entry_percentile / 100, axis=1)
    
    # 포트폴리오 가중치 초기화
    portfolios_basic = pd.DataFrame(0, index=monthly_factors.index, columns=factor_values.columns)
    
    # 이전 포지션을 저장할 변수 초기화
    previous_longs = set()
    previous_shorts = set()
    
    # 각 월별로 상위/하위 종목 선택 및 가중치 할당
    for date in monthly_factors.index:
        if date not in factor_values.index:
            continue
        current_factors = factor_values.loc[date]
        
        # 상위 퍼센타일 종목 (롱 포지션 진입)
        longs = current_factors[current_factors >= top_threshold.loc[date]].index.tolist()
        # 하위 퍼센타일 종목 (숏 포지션 진입)
        shorts = current_factors[current_factors <= bottom_threshold.loc[date]].index.tolist()
        
        # 새로운 포지션 설정
        new_longs = set(longs)
        new_shorts = set(shorts)
        
        # 유지 조건: 이전 포지션이 여전히 상위 20%/하위 20%에 해당하면 유지
        if date != monthly_factors.index[0]:
            retained_longs = {symbol for symbol in previous_longs if factor_values.loc[date, symbol] >= factor_values.loc[date].quantile(1 - retain_percentile / 100)}
            retained_shorts = {symbol for symbol in previous_shorts if factor_values.loc[date, symbol] <= factor_values.loc[date].quantile(retain_percentile / 100)}
            
            # 포트폴리오에 유지할 포지션 추가
            new_longs = new_longs.union(retained_longs)
            new_shorts = new_shorts.union(retained_shorts)
        
        # 1개월 후의 수익률 계산
        try:
            current_pos = monthly_returns.index.get_loc(date)
            next_month = monthly_returns.index[current_pos + 1]
            # 롱 포지션 수익률: 각 롱 종목의 다음 달 수익률 평균
            long_return = monthly_returns.loc[next_month, list(new_longs)].mean() if len(new_longs) > 0 else 0
            # 숏 포지션 수익률: 각 숏 종목의 다음 달 수익률 평균
            short_return = monthly_returns.loc[next_month, list(new_shorts)].mean() if len(new_shorts) > 0 else 0
            # 전체 포트폴리오 수익률: 롱 수익률 - 숏 수익률
            portfolio_return = long_return - short_return
        except (IndexError, KeyError):
            # 다음 달 데이터가 없는 경우 NaN으로 설정
            portfolio_return = np.nan
        
        # 포트폴리오 수익률 로그 저장
        new_log = pd.DataFrame([{
            'Date': date,
            'Longs': ','.join(new_longs),
            'Shorts': ','.join(new_shorts),
            'Return': portfolio_return
        }])
        strategy_logs[strategy_id] = pd.concat([strategy_logs[strategy_id], new_log], ignore_index=True)
        
        # 가중치 할당
        if weight == 'equal_weighted':
            if len(new_longs) > 0:
                portfolios_basic.loc[date, list(new_longs)] += 1 / len(new_longs)
            if len(new_shorts) > 0:
                portfolios_basic.loc[date, list(new_shorts)] -= 1 / len(new_shorts)
        elif weight == 'value_weighted':
            if len(new_longs) > 0:
                try:
                    # 팩터 값을 기준으로 가중치 할당
                    long_factors = current_factors.loc[list(new_longs)]
                    long_weights = long_factors / long_factors.sum()
                    portfolios_basic.loc[date, list(new_longs)] += long_weights
                except KeyError:
                    print(f"롱 포지션 종목의 팩터 데이터가 없습니다: {list(new_longs)}")
            if len(new_shorts) > 0:
                try:
                    # 팩터 값을 기준으로 가중치 할당
                    short_factors = current_factors.loc[list(new_shorts)]
                    short_weights = short_factors / short_factors.sum()
                    portfolios_basic.loc[date, list(new_shorts)] -= short_weights
                except KeyError:
                    print(f"숏 포지션 종목의 팩터 데이터가 없습니다: {list(new_shorts)}")
        
        # 이전 포지션 업데이트
        previous_longs = new_longs
        previous_shorts = new_shorts
    
    # 투자 비율 제한 (100%)
    portfolios_basic = portfolios_basic * investment_rate
    
    # 포트폴리오 수익률 계산
    aligned_weights_basic = portfolios_basic.shift(1).reindex(monthly_returns.index).fillna(0)
    portfolio_monthly_returns_basic = (aligned_weights_basic * monthly_returns).sum(axis=1)
    
    # 초과 수익률 계산 (포트폴리오 수익률 - 벤치마크 수익률)
    excess_returns_basic = portfolio_monthly_returns_basic - benchmark_returns
    
    # 백테스트 결과 저장
    performance_results[strategy_id] = pd.DataFrame({
        'Portfolio_Return': portfolio_monthly_returns_basic,
        'Excess_Return': excess_returns_basic
    })

# ---------------------------
# 4.3. 기본 모멘텀 전략: 진입조건 10%, 보유조건 없음
# ---------------------------

# 팩터 및 퍼센타일 설정
basic_momentum_no_retain_factor = 'momentum'
entry_percentile_no_retain = 10

for weight in weighting_methods:
    # 전략 식별자 생성
    strategy_id = f"{basic_momentum_no_retain_factor}_entry{entry_percentile_no_retain}pct_no_retain_{weight}"
    
    # 현재 팩터의 값 추출
    try:
        factor_values = factor_df.xs(basic_momentum_no_retain_factor, level=1, axis=1)
    except KeyError:
        print(f"팩터 '{basic_momentum_no_retain_factor}'가 factor_df에 존재하지 않습니다.")
        continue
    
    # 상위 및 하위 퍼센타일 기준 임계값 계산 (진입조건 10%)
    top_threshold = factor_values.quantile(1 - entry_percentile_no_retain / 100, axis=1)
    bottom_threshold = factor_values.quantile(entry_percentile_no_retain / 100, axis=1)
    
    # 포트폴리오 가중치 초기화
    portfolios_no_retain = pd.DataFrame(0, index=monthly_factors.index, columns=factor_values.columns)
    
    # 각 월별로 상위/하위 종목 선택 및 가중치 할당
    for date in monthly_factors.index:
        if date not in factor_values.index:
            continue
        current_factors = factor_values.loc[date]
        
        # 상위 퍼센타일 종목 (롱 포지션 진입)
        longs = current_factors[current_factors >= top_threshold.loc[date]].index.tolist()
        # 하위 퍼센타일 종목 (숏 포지션 진입)
        shorts = current_factors[current_factors <= bottom_threshold.loc[date]].index.tolist()
        
        # 새로운 포지션 설정
        new_longs = set(longs)
        new_shorts = set(shorts)
        
        # 유지 조건 없음 (retain_percentile = 0)
        
        # 1개월 후의 수익률 계산
        try:
            current_pos = monthly_returns.index.get_loc(date)
            next_month = monthly_returns.index[current_pos + 1]
            # 롱 포지션 수익률: 각 롱 종목의 다음 달 수익률 평균
            long_return = monthly_returns.loc[next_month, list(new_longs)].mean() if len(new_longs) > 0 else 0
            # 숏 포지션 수익률: 각 숏 종목의 다음 달 수익률 평균
            short_return = monthly_returns.loc[next_month, list(new_shorts)].mean() if len(new_shorts) > 0 else 0
            # 전체 포트폴리오 수익률: 롱 수익률 - 숏 수익률
            portfolio_return = long_return - short_return
        except (IndexError, KeyError):
            # 다음 달 데이터가 없는 경우 NaN으로 설정
            portfolio_return = np.nan
        
        # 포트폴리오 수익률 로그 저장
        new_log = pd.DataFrame([{
            'Date': date,
            'Longs': ','.join(new_longs),
            'Shorts': ','.join(new_shorts),
            'Return': portfolio_return
        }])
        strategy_logs[strategy_id] = pd.concat([strategy_logs[strategy_id], new_log], ignore_index=True)
        
        # 가중치 할당
        if weight == 'equal_weighted':
            if len(new_longs) > 0:
                portfolios_no_retain.loc[date, list(new_longs)] += 1 / len(new_longs)
            if len(new_shorts) > 0:
                portfolios_no_retain.loc[date, list(new_shorts)] -= 1 / len(new_shorts)
        elif weight == 'value_weighted':
            if len(new_longs) > 0:
                try:
                    # 팩터 값을 기준으로 가중치 할당
                    long_factors = current_factors.loc[list(new_longs)]
                    long_weights = long_factors / long_factors.sum()
                    portfolios_no_retain.loc[date, list(new_longs)] += long_weights
                except KeyError:
                    print(f"롱 포지션 종목의 팩터 데이터가 없습니다: {list(new_longs)}")
            if len(new_shorts) > 0:
                try:
                    # 팩터 값을 기준으로 가중치 할당
                    short_factors = current_factors.loc[list(new_shorts)]
                    short_weights = short_factors / short_factors.sum()
                    portfolios_no_retain.loc[date, list(new_shorts)] -= short_weights
                except KeyError:
                    print(f"숏 포지션 종목의 팩터 데이터가 없습니다: {list(new_shorts)}")
    
    # 투자 비율 제한 (100%)
    portfolios_no_retain = portfolios_no_retain * investment_rate
    
    # 포트폴리오 수익률 계산
    aligned_weights_no_retain = portfolios_no_retain.shift(1).reindex(monthly_returns.index).fillna(0)
    portfolio_monthly_returns_no_retain = (aligned_weights_no_retain * monthly_returns).sum(axis=1)
    
    # 초과 수익률 계산 (포트폴리오 수익률 - 벤치마크 수익률)
    excess_returns_no_retain = portfolio_monthly_returns_no_retain - benchmark_returns
    
    # 백테스트 결과 저장
    performance_results[strategy_id] = pd.DataFrame({
        'Portfolio_Return': portfolio_monthly_returns_no_retain,
        'Excess_Return': excess_returns_no_retain
    })

# ---------------------------
# 4.4. 시계열 모멘텀 전략의 방향성 포지션 적용
# ---------------------------

# 시계열 모멘텀 팩터 설정
ts_factor = 'ts_momentum'
ts_percentiles = [10, 20]  # 상위 10%, 20%에 따른 백테스트

for percentile in ts_percentiles:
    for weight in weighting_methods:
        # 전략 식별자 생성
        strategy_id = f"{ts_factor}_directional_{percentile}pct_{weight}"
        
        # 시계열 모멘텀 팩터 추출
        try:
            ts_factor_values = factor_df.xs(ts_factor, level=1, axis=1)
        except KeyError:
            print(f"팩터 '{ts_factor}'가 factor_df에 존재하지 않습니다.")
            continue
        
        # 포트폴리오 가중치 초기화
        portfolios_ts = pd.DataFrame(0, index=monthly_factors.index, columns=ts_factor_values.columns)
        
        # 각 월별로 양수/음수 포지션 설정 및 가중치 할당
        for date in monthly_factors.index:
            if date not in ts_factor_values.index:
                continue
            current_ts = ts_factor_values.loc[date]
            
            # 시계열 모멘텀이 양수인 종목은 롱, 음수인 종목은 숏
            longs = current_ts[current_ts > 0].index.tolist()
            shorts = current_ts[current_ts < 0].index.tolist()
            
            # 1개월 후의 수익률 계산
            try:
                current_pos = monthly_returns.index.get_loc(date)
                next_month = monthly_returns.index[current_pos + 1]
                # 롱 포지션 수익률: 각 롱 종목의 다음 달 수익률 평균
                long_return = monthly_returns.loc[next_month, list(longs)].mean() if len(longs) > 0 else 0
                # 숏 포지션 수익률: 각 숏 종목의 다음 달 수익률 평균
                short_return = monthly_returns.loc[next_month, list(shorts)].mean() if len(shorts) > 0 else 0
                # 전체 포트폴리오 수익률: 롱 수익률 - 숏 수익률
                portfolio_return = long_return - short_return
            except (IndexError, KeyError):
                # 다음 달 데이터가 없는 경우 NaN으로 설정
                portfolio_return = np.nan
            
            # 포트폴리오 수익률 로그 저장
            new_log = pd.DataFrame([{
                'Date': date,
                'Longs': ','.join(longs),
                'Shorts': ','.join(shorts),
                'Return': portfolio_return
            }])
            strategy_logs[strategy_id] = pd.concat([strategy_logs[strategy_id], new_log], ignore_index=True)
            
            # 가중치 할당
            if weight == 'equal_weighted':
                if len(longs) > 0:
                    portfolios_ts.loc[date, longs] += 1 / len(longs)
                if len(shorts) > 0:
                    portfolios_ts.loc[date, shorts] -= 1 / len(shorts)
            elif weight == 'value_weighted':
                if len(longs) > 0:
                    try:
                        # 팩터 값을 기준으로 가중치 할당
                        long_factors = current_ts.loc[list(longs)]
                        long_weights = long_factors / long_factors.sum()
                        portfolios_ts.loc[date, longs] += long_weights
                    except KeyError:
                        print(f"롱 포지션 종목의 팩터 데이터가 없습니다: {longs}")
                if len(shorts) > 0:
                    try:
                        # 팩터 값을 기준으로 가중치 할당
                        short_factors = current_ts.loc[list(shorts)]
                        short_weights = short_factors / short_factors.sum()
                        portfolios_ts.loc[date, shorts] -= short_weights
                    except KeyError:
                        print(f"숏 포지션 종목의 팩터 데이터가 없습니다: {shorts}")
        
        # 투자 비율 제한 (100%)
        portfolios_ts = portfolios_ts * investment_rate
        
        # 포트폴리오 수익률 계산
        aligned_weights_ts = portfolios_ts.shift(1).reindex(monthly_returns.index).fillna(0)
        portfolio_monthly_returns_ts = (aligned_weights_ts * monthly_returns).sum(axis=1)
        
        # 초과 수익률 계산 (포트폴리오 수익률 - 벤치마크 수익률)
        excess_returns_ts = portfolio_monthly_returns_ts - benchmark_returns
        
        # 백테스트 결과 저장
        performance_results[strategy_id] = pd.DataFrame({
            'Portfolio_Return': portfolio_monthly_returns_ts,
            'Excess_Return': excess_returns_ts
        })

# ---------------------------
# 4.5. 추가적인 횡단면적 전략의 퍼센타일 조건 적용
# ---------------------------

# 추가적인 퍼센타일 설정 (10%, 20%, 30%)
additional_percentiles = [10, 20, 30]

for factor in factors:
    for percentile in additional_percentiles:
        for weight in weighting_methods:
            # 전략 식별자 생성
            strategy_id = f"{factor}_{percentile}pct_{weight}"
            
            # 현재 팩터의 값 추출
            try:
                factor_values = factor_df.xs(factor, level=1, axis=1)
            except KeyError:
                print(f"팩터 '{factor}'가 factor_df에 존재하지 않습니다.")
                continue
            
            # 상위 및 하위 퍼센타일 기준 임계값 계산
            top_threshold = factor_values.quantile(1 - percentile / 100, axis=1)
            bottom_threshold = factor_values.quantile(percentile / 100, axis=1)
            
            # 포트폴리오 가중치 초기화
            portfolios_add = pd.DataFrame(0, index=monthly_factors.index, columns=factor_values.columns)
            
            # 각 월별로 상위/하위 종목 선택 및 가중치 할당
            for date in monthly_factors.index:
                if date not in factor_values.index:
                    continue
                current_factors = factor_values.loc[date]
                
                # 상위 퍼센타일 종목
                longs = current_factors[current_factors >= top_threshold.loc[date]].index.tolist()
                # 하위 퍼센타일 종목
                shorts = current_factors[current_factors <= bottom_threshold.loc[date]].index.tolist()
                
                # 1개월 후의 수익률 계산
                try:
                    current_pos = monthly_returns.index.get_loc(date)
                    next_month = monthly_returns.index[current_pos + 1]
                    # 롱 포지션 수익률: 각 롱 종목의 다음 달 수익률 평균
                    long_return = monthly_returns.loc[next_month, list(longs)].mean() if len(longs) > 0 else 0
                    # 숏 포지션 수익률: 각 숏 종목의 다음 달 수익률 평균
                    short_return = monthly_returns.loc[next_month, list(shorts)].mean() if len(shorts) > 0 else 0
                    # 전체 포트폴리오 수익률: 롱 수익률 - 숏 수익률
                    portfolio_return = long_return - short_return
                except (IndexError, KeyError):
                    # 다음 달 데이터가 없는 경우 NaN으로 설정
                    portfolio_return = np.nan
                
                # 포트폴리오 수익률 로그 저장
                new_log = pd.DataFrame([{
                    'Date': date,
                    'Longs': ','.join(longs),
                    'Shorts': ','.join(shorts),
                    'Return': portfolio_return
                }])
                strategy_logs[strategy_id] = pd.concat([strategy_logs[strategy_id], new_log], ignore_index=True)
                
                # 가중치 할당
                if weight == 'equal_weighted':
                    if len(longs) > 0:
                        portfolios_add.loc[date, longs] += 1 / len(longs)
                    if len(shorts) > 0:
                        portfolios_add.loc[date, shorts] -= 1 / len(shorts)
                elif weight == 'value_weighted':
                    if len(longs) > 0:
                        try:
                            # 팩터 값을 기준으로 가중치 할당
                            long_factors = current_factors.loc[list(longs)]
                            long_weights = long_factors / long_factors.sum()
                            portfolios_add.loc[date, longs] += long_weights
                        except KeyError:
                            print(f"롱 포지션 종목의 팩터 데이터가 없습니다: {longs}")
                    if len(shorts) > 0:
                        try:
                            # 팩터 값을 기준으로 가중치 할당
                            short_factors = current_factors.loc[list(shorts)]
                            short_weights = short_factors / short_factors.sum()
                            portfolios_add.loc[date, shorts] -= short_weights
                        except KeyError:
                            print(f"숏 포지션 종목의 팩터 데이터가 없습니다: {shorts}")
            
            # 투자 비율 제한 (100%)
            portfolios_add = portfolios_add * investment_rate
            
            # 포트폴리오 수익률 계산
            aligned_weights_add = portfolios_add.shift(1).reindex(monthly_returns.index).fillna(0)
            portfolio_monthly_returns_add = (aligned_weights_add * monthly_returns).sum(axis=1)
            
            # 초과 수익률 계산 (포트폴리오 수익률 - 벤치마크 수익률)
            excess_returns_add = portfolio_monthly_returns_add - benchmark_returns
            
            # 백테스트 결과 저장
            performance_results[strategy_id] = pd.DataFrame({
                'Portfolio_Return': portfolio_monthly_returns_add,
                'Excess_Return': excess_returns_add
            })


Benchmark Returns:
timestamp
2021-10-31    0.215302
2021-11-30    0.283248
2021-12-31   -0.183973
2022-01-31   -0.318677
2022-02-28   -0.001011
Freq: ME, dtype: float64


In [51]:

# ---------------------------
# 5. 백테스트 결과 확인
# ---------------------------

# 백테스트 결과 확인 (일부 전략만 출력)
for strategy, perf in list(performance_results.items())[:5]:  # 상위 5개 전략만 예시로 출력
    print(f"Strategy: {strategy}")
    display(perf.head())
    print("\n")


Strategy: momentum_10pct_equal_weighted


Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-10-31,0.0,-0.215302
2021-11-30,0.0,-0.283248
2021-12-31,0.0,0.183973
2022-01-31,0.0,0.318677
2022-02-28,0.0,0.001011




Strategy: momentum_10pct_value_weighted


Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-10-31,0.0,-0.215302
2021-11-30,0.0,-0.283248
2021-12-31,0.0,0.183973
2022-01-31,0.0,0.318677
2022-02-28,0.0,0.001011




Strategy: momentum_20pct_equal_weighted


Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-10-31,0.0,-0.215302
2021-11-30,0.0,-0.283248
2021-12-31,0.0,0.183973
2022-01-31,0.0,0.318677
2022-02-28,0.0,0.001011




Strategy: momentum_20pct_value_weighted


Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-10-31,0.0,-0.215302
2021-11-30,0.0,-0.283248
2021-12-31,0.0,0.183973
2022-01-31,0.0,0.318677
2022-02-28,0.0,0.001011




Strategy: momentum_30pct_equal_weighted


Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-10-31,0.0,-0.215302
2021-11-30,0.0,-0.283248
2021-12-31,0.0,0.183973
2022-01-31,0.0,0.318677
2022-02-28,0.0,0.001011






In [52]:
# (재필) 일부 ts 전략의 경우 12 month lookback 때문에 1년 이후부터 결과가 나오므로, 공정한 비교를 위해 1년 이후 데이터만 사용
START = '2022-11-30'

performance_results = {k: v.loc[START:] for k, v in performance_results.items()}

# 수익률 첫 날은 모두 0에서 시작하도록 0번째 index는 다 0으로 설정
for k, v in performance_results.items():
    v['Portfolio_Return'].iloc[0] = 0

In [57]:

# ---------------------------
# 6. 백테스트 결과 시각화 (Plotly Express 사용)
# ---------------------------

# 누적 포트폴리오 수익률 그래프


strategy_families = [
    'ts_momentum_directional',
    'momentum_entry',
    'ts_momentum',

    'momentum',

    'ratio_to_high',
    'tk_value',
    'potential_return',
    'max_return',
    'salience'
]

strategy_to_family = {}
for family in strategy_families:
    for strategy in performance_results.keys():
        if family in strategy and strategy not in strategy_to_family:
            strategy_to_family[strategy] = family

family_to_strategies = {}
for strategy, family in strategy_to_family.items():
    if family not in family_to_strategies:
        family_to_strategies[family] = []
    family_to_strategies[family].append(strategy)

for family, strategies in family_to_strategies.items():
    fig = px.line(title=f'{family}-family strategies cum return')
    for strategy in strategies:
        perf = performance_results[strategy]
        cumulative_return = (1 + perf['Portfolio_Return']).cumprod() - 1
        fig.add_scatter(x=cumulative_return.index, y=cumulative_return.values, mode='lines', name=strategy)

    
    fig.update_layout(xaxis_title='Date', yaxis_title='Cumulative Return')
    fig.show()

# # 개별 전략의 월별 수익률 비교
# for strategy, perf in performance_results.items():
#     fig = px.line(title=f'Monthly Returns: {strategy}')
#     fig.add_scatter(x=perf.index, y=perf['Portfolio_Return'], mode='lines', name='Portfolio Return')
#     fig.add_scatter(x=perf.index, y=benchmark_returns.loc[perf.index], mode='lines', name='Benchmark Return', line=dict(dash='dash'))
#     fig.update_layout(xaxis_title='Date', yaxis_title='Return')
#     fig.show()


In [58]:
performance_results.keys()

dict_keys(['momentum_10pct_equal_weighted', 'momentum_10pct_value_weighted', 'momentum_20pct_equal_weighted', 'momentum_20pct_value_weighted', 'momentum_30pct_equal_weighted', 'momentum_30pct_value_weighted', 'ts_momentum_10pct_equal_weighted', 'ts_momentum_10pct_value_weighted', 'ts_momentum_20pct_equal_weighted', 'ts_momentum_20pct_value_weighted', 'ts_momentum_30pct_equal_weighted', 'ts_momentum_30pct_value_weighted', 'ratio_to_high_10pct_equal_weighted', 'ratio_to_high_10pct_value_weighted', 'ratio_to_high_20pct_equal_weighted', 'ratio_to_high_20pct_value_weighted', 'ratio_to_high_30pct_equal_weighted', 'ratio_to_high_30pct_value_weighted', 'tk_value_10pct_equal_weighted', 'tk_value_10pct_value_weighted', 'tk_value_20pct_equal_weighted', 'tk_value_20pct_value_weighted', 'tk_value_30pct_equal_weighted', 'tk_value_30pct_value_weighted', 'potential_return_10pct_equal_weighted', 'potential_return_10pct_value_weighted', 'potential_return_20pct_equal_weighted', 'potential_return_20pct_va

In [59]:
performance_results['momentum_10pct_equal_weighted']

Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2022-11-30,0.0,0.141824
2022-12-31,0.022166,0.24381
2023-01-31,-0.19743,-0.852927
2023-02-28,0.036993,0.05046
2023-03-31,0.02872,0.028737
2023-04-30,0.016217,0.062319
2023-05-31,0.017902,0.104898
2023-06-30,0.01574,0.060369
2023-07-31,-0.005141,-0.012828
2023-08-31,-0.006924,0.112106


In [60]:

# ---------------------------
# 7. 성과 지표 추가 (누적 수익률, MDD, Sharpe Ratio)
# ---------------------------

# summary = pd.DataFrame(columns=['Strategy', 'Total Return', 'MDD', 'Sharpe Ratio'])
summary = pd.DataFrame(columns=['Strategy', 'Total Return', 'MDD',])

for strategy, perf in performance_results.items():
    # 누적 수익률 계산
    perf['Cumulative_Return'] = (1 + perf['Portfolio_Return']).cumprod() - 1
    
    # 최대 낙폭(MDD) 계산
    running_max = (1 + perf['Cumulative_Return']).cummax()
    drawdown = (1 + perf['Cumulative_Return']) / running_max - 1
    mdd = drawdown.min()
    
    # 샤프 비율 계산 (월 단위) 
    # (재필) --> 전기간 샤프를 계산하고 있음. 앞 부분 return 0인 경우도 있던데 그런 부분 고려해줘야 할듯.
    # 그리고 sharpe를 쓰기엔... risk free란게 크립토에 없어서 좀 애매한거 같음. 그냥 return-to-vol ratio로 쓰는게 나을듯.
    # 소르티노 비율로 생각하면 target return이 0인 것임. 

    # sharpe_ratio = (perf['Excess_Return'].mean() / perf['Excess_Return'].std()) * np.sqrt(12) if perf['Excess_Return'].std() != 0 else np.nan
    perf['rolling_rtv'] = perf['Portfolio_Return'].rolling(12).mean() / perf['Portfolio_Return'].rolling(12).std()
    
    # 성과 요약 추가
    summary = pd.concat([summary, pd.DataFrame([{
        'Strategy': strategy,
        'Total Return': perf['Cumulative_Return'].iloc[-1],
        'MDD': mdd,
        # 'Sharpe Ratio': perf['rolling_rtv'].iloc[-1]
        '1y_return-to-vol': perf['rolling_rtv'].iloc[-1] # 지난 1년간의 샤프만 계산
        # 'mean_return-to-vol': perf['rolling_rtv'].mean() # 또는 기간 동안의 평균 샤프 계산
    }])], ignore_index=True)


In [61]:

# ---------------------------
# 8. 최종 성과 요약 출력
# ---------------------------

print("Performance Summary:")

# (재필) 1y-rtv로 정렬
summary = summary.sort_values(by='1y_return-to-vol', ascending=False)

display(summary)

# ---------------------------
# 9. 추가적인 디버깅: 각 전략과 날짜별 롱/숏 종목 수와 수익률 출력 및 CSV 파일 저장
# ---------------------------

# 결과를 저장할 디렉토리 설정
output_dir = 'backtest_results'
os.makedirs(output_dir, exist_ok=True)

# 모든 전략의 로그를 순회하여 CSV 파일로 저장
# for strategy, log_df in strategy_logs.items():
#     # 파일명 생성: 전략명과 세부 설정을 포함
#     filename = f"{strategy}.csv"
#     filepath = os.path.join(output_dir, filename)
    
#     # CSV 파일로 저장
#     log_df.to_csv(filepath, index=False)
    
#     print(f"Saved {filepath}")

Performance Summary:


Unnamed: 0,Strategy,Total Return,MDD,1y_return-to-vol
24,potential_return_10pct_equal_weighted,0.408754,-0.056799,0.599789
13,ratio_to_high_10pct_value_weighted,0.042722,-0.20032,0.52592
25,potential_return_10pct_value_weighted,0.347736,-0.056263,0.461102
12,ratio_to_high_10pct_equal_weighted,0.00076,-0.211988,0.368386
8,ts_momentum_20pct_equal_weighted,-0.131517,-0.221176,0.253039
9,ts_momentum_20pct_value_weighted,-0.132914,-0.225244,0.251338
6,ts_momentum_10pct_equal_weighted,-0.063943,-0.20149,0.188735
7,ts_momentum_10pct_value_weighted,-0.070112,-0.224,0.1852
42,momentum_entry10pct_retain20pct_equal_weighted,-0.060993,-0.218179,0.167963
26,potential_return_20pct_equal_weighted,0.214079,-0.040658,0.157935


## 위의 백테스팅 결과 해석

1. 왜인진 모르겠다만 potential_return(=Capital gain overhang)과 max_return(=maxing out) 팩터가 개좋다.
2. 너 얘전에 했을때는 모멘텀이 사기라며? 에 대한 해명 : 그때랑 데이터 수집방식 다름+모멘텀 선경편향있었음+그때보다 위 두개 팩터 더 정교하게 짬(그때는 대충 턴오버+감쇄 논리로 capital gain overhang짰는데 이번에는 좀 날카롭게 짜여짐)
3. 그럼 합성을 어떻게 할 껀가요? GSCORE 팩터의 논리를 따와서 zscore(capital gain overhang)+zscore(maxing out)시켜서 하겠습니다.
4. 그리고 내가 지금 눈깔 맛탱이가기 1분전이라 수익률 로그파일 만들어놨으니 이걸로 샤프 MDD 전부 뽑아줘 엑셀로 자동저장될꺼임
5. 종목 적다고 10%가 아니라 더 늘리려고 수작부리니깐 귀신같이 수익률 "정상화"당함

# 팩터보다 좋은건 더 많은 팩터

자~ 어린이 여러분, 저희는 위 백테스트 결과에서 두 개의 좋은 팩터세팅이 있다는걸 찾았어요우

maxing out 10pct EW가 하나고, 다른 하나는 capital gain overhang 10pct EW에요우.

그렇다면 이 두개를 합성한다면? rEwArD로 aWeSoMe한 전?략이 탄?생 할거 같아요우

In [62]:
import numpy as np
import pandas as pd
import plotly.express as px
import os
from scipy.stats import zscore

# ---------------------------
# 1. 데이터 준비
# ---------------------------

# daily_df와 factor_df는 이미 계산되어 있다고 가정합니다.
# daily_df: MultiIndex 컬럼 구조 (symbol, attribute), 예: ('BTC/USDT', 'close')
# factor_df: MultiIndex 컬럼 구조 (symbol, factor), 예: ('BTC/USDT', 'momentum')

# ---------------------------
# 2. 멀티 인덱스 컬럼 설정
# ---------------------------

# factor_df 컬럼이 튜플 형태로 되어있는지 확인하고, 멀티 인덱스로 변환
if not isinstance(factor_df.columns, pd.MultiIndex):
    try:
        factor_df.columns = pd.MultiIndex.from_tuples(factor_df.columns)
    except Exception as e:
        print("factor_df.columns을 MultiIndex로 변환하는 중 오류 발생:", e)

# daily_df 컬럼이 튜플 형태로 되어있는지 확인하고, 멀티 인덱스로 변환
if not isinstance(daily_df.columns, pd.MultiIndex):
    try:
        daily_df.columns = pd.MultiIndex.from_tuples(daily_df.columns)
    except Exception as e:
        print("daily_df.columns을 MultiIndex로 변환하는 중 오류 발생:", e)

# ---------------------------
# 2.1. 'potential_return'과 'max_return'의 z-score 합산하여 새로운 팩터 생성
# ---------------------------

# 기존 'combined_factor'가 존재하면 제거
if 'combined_factor' in factor_df.columns.get_level_values(1):
    # **수정된 부분 시작**
    factor_df = factor_df.drop(columns='combined_factor', level=1)
    # **수정된 부분 끝**

# 'potential_return'과 'max_return' 데이터 추출
try:
    potential_return_df = factor_df.xs('potential_return', level=1, axis=1)
    max_return_df = factor_df.xs('max_return', level=1, axis=1)
except KeyError as e:
    print(f"KeyError: {e}. Ensure 'potential_return' and 'max_return' are in factor_df.")

# 횡단면적 z-score 계산
potential_return_zscore = potential_return_df.apply(lambda x: zscore(x, nan_policy='omit'), axis=1)
max_return_zscore = max_return_df.apply(lambda x: zscore(x, nan_policy='omit'), axis=1)

# z-score 합산하여 새로운 팩터 생성
combined_zscore = potential_return_zscore + max_return_zscore

# 새로운 팩터를 factor_df에 추가
new_factor_columns = pd.MultiIndex.from_tuples([(symbol, 'combined_factor') for symbol in combined_zscore.columns])
combined_factor_df = pd.DataFrame(combined_zscore.values, index=combined_zscore.index, columns=new_factor_columns)
factor_df = pd.concat([factor_df, combined_factor_df], axis=1)

# 메모리 정리 (선택사항)
del potential_return_df, max_return_df, potential_return_zscore, max_return_zscore, combined_zscore

# ---------------------------
# 3. 벤치마크 수익률 계산
# ---------------------------

# 일별 수익률이 이미 'returns' 열에 계산되어 있다고 가정
# 월별 누적 수익률 계산: (1 + 일별 수익률).resample('M').apply(np.prod) - 1
monthly_returns = (1 + daily_df.xs('returns', level=1, axis=1)).resample('M').apply(np.prod) - 1

# 벤치마크: 모든 종목의 월별 누적 수익률의 평균 (동일가중)
benchmark_returns = monthly_returns.mean(axis=1)

# 벤치마크 수익률 확인
print("Benchmark Returns:")
print(benchmark_returns.head())

# ---------------------------
# 4. 백테스팅 전략 설정 및 수행
# ---------------------------

# 전략 설정
factors = ['combined_factor']
percentiles = [10, 20, 30, 40]  # 추가된 40% 퍼센타일
weighting_methods = ['equal_weighted', 'value_weighted']

# 백테스트 결과를 저장할 딕셔너리 초기화
performance_results = {}

# 백테스트 로그를 저장할 딕셔너리 초기화
strategy_logs = {f"{factor}_{percentile}pct_{weight}": pd.DataFrame(columns=['Date', 'Longs', 'Shorts', 'Return'])
                for factor in factors for percentile in percentiles for weight in weighting_methods}

# 월말 기준으로 팩터 데이터 리샘플링
monthly_factors = factor_df.resample('M').last()

# 투자 비율 설정 (100%)
investment_rate = 0.2  # 20%로 설정

# 전체 종목 리스트
symbols = factor_df.columns.levels[0]

# ---------------------------
# 4.1. 일반적인 퍼센타일 기반 포트폴리오 구성
# ---------------------------

for factor in factors:
    for percentile in percentiles:
        for weight in weighting_methods:
            # 전략 식별자 생성
            strategy_id = f"{factor}_{percentile}pct_{weight}"
            
            # 현재 팩터의 값 추출
            try:
                factor_values = factor_df.xs(factor, level=1, axis=1)
            except KeyError:
                print(f"팩터 '{factor}'가 factor_df에 존재하지 않습니다.")
                continue

            # 중복된 컬럼 제거 (중요)
            factor_values = factor_values.loc[:, ~factor_values.columns.duplicated()]

            # 상위 및 하위 퍼센타일 기준 임계값 계산
            top_threshold = factor_values.quantile(1 - percentile / 100, axis=1)
            bottom_threshold = factor_values.quantile(percentile / 100, axis=1)

            # 포트폴리오 가중치 초기화
            portfolios = pd.DataFrame(0, index=monthly_factors.index, columns=factor_values.columns)

            # 각 월별로 상위/하위 종목 선택 및 가중치 할당
            for date in monthly_factors.index:
                if date not in factor_values.index:
                    continue
                current_factors = factor_values.loc[date]

                # 상위 퍼센타일 종목
                longs = current_factors[current_factors >= top_threshold.loc[date]].index.tolist()
                # 하위 퍼센타일 종목
                shorts = current_factors[current_factors <= bottom_threshold.loc[date]].index.tolist()

                # 중복된 심볼 제거
                longs = list(set(longs))
                shorts = list(set(shorts))

                # 1개월 후의 수익률 계산
                try:
                    current_pos = monthly_returns.index.get_loc(date)
                    next_month = monthly_returns.index[current_pos + 1]
                    # 롱 포지션 수익률: 각 롱 종목의 다음 달 수익률 평균
                    long_return = monthly_returns.loc[next_month, longs].mean() if len(longs) > 0 else 0
                    # 숏 포지션 수익률: 각 숏 종목의 다음 달 수익률 평균
                    short_return = monthly_returns.loc[next_month, shorts].mean() if len(shorts) > 0 else 0
                    # 전체 포트폴리오 수익률: 롱 수익률 - 숏 수익률
                    portfolio_return = long_return - short_return
                except (IndexError, KeyError):
                    # 다음 달 데이터가 없는 경우 NaN으로 설정
                    portfolio_return = np.nan

                # 포트폴리오 수익률 로그 저장
                new_log = pd.DataFrame([{
                    'Date': date,
                    'Longs': ','.join(longs),    # 롱 종목을 쉼표로 구분한 문자열로 저장
                    'Shorts': ','.join(shorts),  # 숏 종목을 쉼표로 구분한 문자열로 저장
                    'Return': portfolio_return
                }])
                strategy_logs[strategy_id] = pd.concat([strategy_logs[strategy_id], new_log], ignore_index=True)

                # 가중치 할당
                if weight == 'equal_weighted':
                    if len(longs) > 0:
                        portfolios.loc[date, longs] += 1 / len(longs)
                    if len(shorts) > 0:
                        portfolios.loc[date, shorts] -= 1 / len(shorts)
                elif weight == 'value_weighted':
                    if len(longs) > 0:
                        # 중복된 심볼 제거
                        longs = list(set(longs))
                        try:
                            # 팩터 값을 기준으로 가중치 할당
                            long_factors = current_factors.loc[longs]
                            long_weights = long_factors / long_factors.abs().sum()
                            portfolios.loc[date, longs] += long_weights
                        except KeyError:
                            print(f"롱 포지션 종목의 팩터 데이터가 없습니다: {longs}")
                    if len(shorts) > 0:
                        # 중복된 심볼 제거
                        shorts = list(set(shorts))
                        try:
                            # 팩터 값을 기준으로 가중치 할당
                            short_factors = current_factors.loc[shorts]
                            short_weights = short_factors / short_factors.abs().sum()
                            portfolios.loc[date, shorts] -= short_weights
                        except KeyError:
                            print(f"숏 포지션 종목의 팩터 데이터가 없습니다: {shorts}")

            # 투자 비율 제한 (20%)
            portfolios = portfolios * investment_rate

            # 포트폴리오 수익률 계산
            aligned_weights = portfolios.shift(1).reindex(monthly_returns.index).fillna(0)
            portfolio_monthly_returns = (aligned_weights * monthly_returns).sum(axis=1)

            # 초과 수익률 계산 (포트폴리오 수익률 - 벤치마크 수익률)
            excess_returns = portfolio_monthly_returns - benchmark_returns

            # 백테스트 결과 저장
            performance_results[strategy_id] = pd.DataFrame({
                'Portfolio_Return': portfolio_monthly_returns,
                'Excess_Return': excess_returns
            })


Benchmark Returns:
timestamp
2021-10-31    0.215302
2021-11-30    0.283248
2021-12-31   -0.183973
2022-01-31   -0.318677
2022-02-28   -0.001011
Freq: ME, dtype: float64


In [63]:
# (재필) 일부 ts 전략의 경우 12 month lookback 때문에 1년 이후부터 결과가 나오므로, 공정한 비교를 위해 1년 이후 데이터만 사용
START = '2022-11-30'

performance_results = {k: v.loc[START:] for k, v in performance_results.items()}

# 수익률 첫 날은 모두 0에서 시작하도록 0번째 index는 다 0으로 설정
for k, v in performance_results.items():
    v['Portfolio_Return'].iloc[0] = 0

In [64]:

# ---------------------------
# 5. 백테스트 결과 확인
# ---------------------------

# 백테스트 결과 확인 (일부 전략만 출력)
for strategy, perf in list(performance_results.items())[:5]:  # 상위 5개 전략만 예시로 출력
    print(f"Strategy: {strategy}")
    display(perf.head())
    print("\n")


Strategy: combined_factor_10pct_equal_weighted


Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2022-11-30,0.0,0.187033
2022-12-31,-0.03456,0.187083
2023-01-31,0.050804,-0.604693
2023-02-28,-0.019342,-0.005876
2023-03-31,-0.000847,-0.000829




Strategy: combined_factor_10pct_value_weighted


Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2022-11-30,0.0,0.142513
2022-12-31,-0.107698,0.113945
2023-01-31,0.216941,-0.438556
2023-02-28,-0.008858,0.004608
2023-03-31,0.035993,0.036011




Strategy: combined_factor_20pct_equal_weighted


Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2022-11-30,0.0,0.169804
2022-12-31,-0.02488,0.196763
2023-01-31,0.076777,-0.57872
2023-02-28,-0.013957,-0.00049
2023-03-31,0.001875,0.001893




Strategy: combined_factor_20pct_value_weighted


Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2022-11-30,0.0,0.123945
2022-12-31,-0.106366,0.115277
2023-01-31,0.223007,-0.432491
2023-02-28,-0.016018,-0.002551
2023-03-31,0.025545,0.025562




Strategy: combined_factor_30pct_equal_weighted


Unnamed: 0_level_0,Portfolio_Return,Excess_Return
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1
2022-11-30,0.0,0.173341
2022-12-31,-0.020828,0.200815
2023-01-31,0.098947,-0.556551
2023-02-28,-0.003538,0.009929
2023-03-31,-0.000347,-0.00033






In [66]:

# ---------------------------
# 6. 백테스트 결과 시각화 (Plotly Express 사용)
# ---------------------------

# 누적 포트폴리오 수익률 그래프
fig = px.line(title='Cumulative Portfolio Returns of Strategies')

for strategy, perf in performance_results.items():
    cumulative_return = (1 + perf['Portfolio_Return']).cumprod() - 1
    fig.add_scatter(x=cumulative_return.index, y=cumulative_return.values, mode='lines', name=strategy)

fig.update_layout(xaxis_title='Date', yaxis_title='Cumulative Portfolio Return')
fig.show()

# ---------------------------
# 7. 성과 지표 추가 (누적 수익률, MDD, Sharpe Ratio)
# ---------------------------

summary = pd.DataFrame(columns=['Strategy', 'Total Return', 'MDD', 'Sharpe Ratio'])

for strategy, perf in performance_results.items():
    # 누적 수익률 계산
    perf['Cumulative_Return'] = (1 + perf['Portfolio_Return']).cumprod() - 1

    # 최대 낙폭(MDD) 계산
    running_max = (1 + perf['Cumulative_Return']).cummax()
    drawdown = (1 + perf['Cumulative_Return']) / running_max - 1
    mdd = drawdown.min()


    # (재필) 앞부분처럼 샤프 계산 부분 수정

    # sharpe_ratio = (perf['Excess_Return'].mean() / perf['Excess_Return'].std()) * np.sqrt(12) if perf['Excess_Return'].std() != 0 else np.nan
    perf['rolling_rtv'] = perf['Portfolio_Return'].rolling(12).mean() / perf['Portfolio_Return'].rolling(12).std()
    
    # 성과 요약 추가
    summary = pd.concat([summary, pd.DataFrame([{
        'Strategy': strategy,
        'Total Return': perf['Cumulative_Return'].iloc[-1],
        'MDD': mdd,
        # 'Sharpe Ratio': perf['rolling_rtv'].iloc[-1]
        '1y_return-to-vol': perf['rolling_rtv'].iloc[-1] # 지난 1년간의 샤프만 계산
        # 'mean_return-to-vol': perf['rolling_rtv'].mean() # 또는 기간 동안의 평균 샤프 계산
    }])], ignore_index=True)

# ---------------------------
# 8. 최종 성과 요약 출력
# ---------------------------

print("Performance Summary:")
display(summary)

# ---------------------------
# 9. 추가적인 디버깅: 각 전략과 날짜별 롱/숏 종목 수와 수익률 출력 및 CSV 파일 저장
# ---------------------------

# 결과를 저장할 디렉토리 설정
output_dir = 'backtest_results'
os.makedirs(output_dir, exist_ok=True)

# # 모든 전략의 로그를 순회하여 CSV 파일로 저장
# for strategy, log_df in strategy_logs.items():
#     # 파일명 생성: 전략명과 세부 설정을 포함
#     filename = f"{strategy}.csv"
#     filepath = os.path.join(output_dir, filename)

#     # CSV 파일로 저장
#     log_df.to_csv(filepath, index=False)

#     print(f"Saved {filepath}")

Performance Summary:


Unnamed: 0,Strategy,Total Return,MDD,Sharpe Ratio,1y_return-to-vol
0,combined_factor_10pct_equal_weighted,0.223367,-0.062956,,0.266012
1,combined_factor_10pct_value_weighted,0.410306,-0.196061,,0.053615
2,combined_factor_20pct_equal_weighted,0.087971,-0.086645,,-0.059092
3,combined_factor_20pct_value_weighted,0.41603,-0.18311,,0.137554
4,combined_factor_30pct_equal_weighted,0.185868,-0.073974,,0.025294
5,combined_factor_30pct_value_weighted,0.418503,-0.199603,,0.151353
6,combined_factor_40pct_equal_weighted,0.157378,-0.037379,,0.141788
7,combined_factor_40pct_value_weighted,0.441946,-0.197383,,0.153333
