In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import numpy as np
import pandas as pd
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Conv1D, GlobalMaxPooling1D, Dense, Dropout, Flatten
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import itertools

In [90]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Conv1D, GlobalMaxPooling1D, Dense, Dropout, Attention
import unicodedata

# 문자열 정규화 함수
def normalize_text(text):
    return unicodedata.normalize('NFC', text.strip().replace(' ', '').replace('\xa0', '').replace('\t', ''))

# 데이터 경로 설정
scores_path = '/content/drive/MyDrive/KHUDA 금융/심화트랙/데이터/섹터별 일일 평균 점수 - 4차'
prices_path = '/content/drive/MyDrive/KHUDA 금융/심화트랙/데이터/지수 종가'
optimal_combinations_path = '/content/drive/MyDrive/KHUDA 금융/심화트랙/데이터/significant_results-8-최적의조합.csv'

# 최적 조합 파일 로드
optimal_combinations = pd.read_csv(optimal_combinations_path, encoding='utf-8-sig')

# 데이터 클렌징
optimal_combinations['score_sector'] = optimal_combinations['score_sector'].apply(normalize_text)
optimal_combinations['price_sector'] = optimal_combinations['price_sector'].apply(normalize_text)

# 최적 조합 리스트 생성
optimal_pairs = optimal_combinations[['score_sector', 'price_sector']].drop_duplicates()

# 파일 리스트 및 정규화된 리스트 준비
score_files = os.listdir(scores_path)
normalized_score_files = [normalize_text(file_name) for file_name in score_files]
price_files = os.listdir(prices_path)
normalized_price_files = [normalize_text(file_name) for file_name in price_files]

# 각 최적 조합에 대해 데이터 처리 및 학습
expected_returns = {}

for _, pair in optimal_pairs.iterrows():
    score_sector = pair['score_sector']
    price_sector = pair['price_sector']
    print(f"Processing combination: {score_sector} -> {price_sector}")

    # 감성 점수 파일 경로 찾기
    expected_score_file = normalize_text(f"{score_sector}_sector_daily_weighted_scores.csv")
    score_file = next((os.path.join(scores_path, original_file)
                       for original_file, normalized_file in zip(score_files, normalized_score_files)
                       if expected_score_file == normalized_file), None)
    if not score_file:
        print(f"No matching file for score sector '{score_sector}' in '{scores_path}'")
        continue

    # 종가 파일 경로 찾기
    expected_price_file = normalize_text(f"KRX {price_sector}.csv")
    price_file = next((os.path.join(prices_path, original_file)
                       for original_file, normalized_file in zip(price_files, normalized_price_files)
                       if expected_price_file == normalized_file), None)
    if not price_file:
        print(f"No matching file for price sector '{price_sector}' in '{prices_path}'")
        continue

    print(f"Files found: Score - {score_file}, Price - {price_file}")

    # 파일 읽기
    scores_df = pd.read_csv(score_file, encoding='utf-8')
    prices_df = pd.read_csv(price_file, encoding='euc-kr')

    # 날짜 변환
    scores_df['Date'] = pd.to_datetime(scores_df['Date'], errors='coerce')
    prices_df['Date'] = pd.to_datetime(prices_df['Date'], errors='coerce')

    # 데이터 병합
    merged_data = pd.merge(scores_df, prices_df, on='Date', how='inner')[['sector_weighted_score', '종가']]
    if merged_data.empty:
        print(f"No data available for combination: {score_sector} -> {price_sector}")
        continue

    # **종가를 수익률로 변환**
    merged_data['returns'] = merged_data['종가'].pct_change().fillna(0)

    # 감성 점수와 수익률 스케일링
    score_scaler = MinMaxScaler()
    returns_scaler = MinMaxScaler()

    merged_data['sector_weighted_score'] = score_scaler.fit_transform(merged_data[['sector_weighted_score']])
    merged_data['returns'] = returns_scaler.fit_transform(merged_data[['returns']])

    # 시계열 데이터 생성 함수
    def create_sequences(data, time_steps):
        sequences = []
        targets = []
        for i in range(len(data) - time_steps):
            sequences.append(data[i:i + time_steps, :-1])
            targets.append(data[i + time_steps, -1])
        return np.array(sequences), np.array(targets)

    # 시계열 데이터 생성
    time_steps = 20
    data_scaled = merged_data[['sector_weighted_score', 'returns']].values
    X, y = create_sequences(data_scaled, time_steps)

    # 훈련 및 테스트 데이터 분리
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # LSTM + Attention 모델 정의
    input_layer = Input(shape=(X_train.shape[1], X_train.shape[2]))
    lstm_layer = LSTM(100, return_sequences=True)(input_layer)
    attention = Attention()([lstm_layer, lstm_layer])  # Attention Layer 추가
    cnn_layer = Conv1D(64, kernel_size=3, activation='relu')(attention)
    pooling_layer = GlobalMaxPooling1D()(cnn_layer)
    dense_layer = Dense(128, activation='relu')(pooling_layer)
    dropout_layer = Dropout(0.3)(dense_layer)
    output_layer = Dense(1)(dropout_layer)

    model = Model(inputs=input_layer, outputs=output_layer)
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])

    # 모델 학습
    print(f"Training model for combination: {score_sector} -> {price_sector}")
    model.fit(X_train, y_train, epochs=20, batch_size=64, validation_split=0.1, verbose=1)

    # 예측
    predicted_returns = model.predict(X_test)

    # **예측값 복원**
    predicted_returns = returns_scaler.inverse_transform(predicted_returns).flatten()
    actual_returns = returns_scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()

    # 종가로 복원
    predicted_prices = merged_data['종가'].values[-len(predicted_returns)-1:-1] * (1 + predicted_returns)
    actual_prices = merged_data['종가'].values[-len(actual_returns)-1:-1] * (1 + actual_returns)

    # 수익률 계산
    avg_predicted_return = np.mean(predicted_returns)
    avg_actual_return = np.mean(actual_returns)

    expected_returns[f"{score_sector} -> {price_sector}"] = {
        "Predicted": avg_predicted_return,
        "Actual": avg_actual_return
    }

    # 결과 시각화
    plt.figure(figsize=(10, 6))
    plt.plot(actual_prices, label='Actual Prices', alpha=0.7)
    plt.plot(predicted_prices, label='Predicted Prices', alpha=0.7)
    plt.legend()
    plt.title(f"{score_sector} -> {price_sector}: Predicted vs Actual Prices")
    plt.show()

# 최적 조합의 기대 수익률 출력
print("Expected Returns for Optimal Combinations:")
for pair, returns in expected_returns.items():
    print(f"{pair}: Predicted = {returns['Predicted']:.4f}, Actual = {returns['Actual']:.4f}")


Output hidden; open in https://colab.research.google.com to view.

In [91]:
# 최적 조합의 기대 수익률 출력
print("Expected Returns for Optimal Combinations:")
for pair, returns in expected_returns.items():
    print(f"{pair}: Predicted = {returns['Predicted']:.4f}, Actual = {returns['Actual']:.4f}")

Expected Returns for Optimal Combinations:
건설 -> 건설: Predicted = -0.0014, Actual = -0.0002
건설 -> 경기소비재: Predicted = -0.0010, Actual = 0.0001
건설 -> 기계장비: Predicted = -0.0027, Actual = -0.0026
건설 -> 반도체: Predicted = -0.0015, Actual = 0.0004
건설 -> 보험: Predicted = 0.0003, Actual = 0.0015
건설 -> 에너지화학: Predicted = -0.0056, Actual = -0.0030
건설 -> 자동차: Predicted = 0.0005, Actual = 0.0014
건설 -> 정보기술: Predicted = -0.0008, Actual = 0.0009
건설 -> 철강: Predicted = -0.0020, Actual = -0.0026
경기소비재 -> 경기소비재: Predicted = -0.0004, Actual = 0.0001
경기소비재 -> 기계장비: Predicted = -0.0026, Actual = -0.0026
경기소비재 -> 반도체: Predicted = 0.0012, Actual = 0.0004
경기소비재 -> 방송통신: Predicted = -0.0018, Actual = 0.0007
경기소비재 -> 보험: Predicted = 0.0020, Actual = 0.0015
경기소비재 -> 에너지화학: Predicted = -0.0016, Actual = -0.0030
경기소비재 -> 은행: Predicted = 0.0001, Actual = 0.0039
경기소비재 -> 자동차: Predicted = 0.0007, Actual = 0.0014
경기소비재 -> 철강: Predicted = -0.0016, Actual = -0.0026
경기소비재 -> 헬스케어: Predicted = 0.0006, Actual = 0.0004
기계장비 -> 

In [95]:
# 백테스팅 초기 설정
initial_capital = 1000000  # 초기 자본금
transaction_fee = 0.001    # 거래 수수료 (0.1%)

# 백테스팅 결과 저장
backtesting_results = []

# 각 조합별로 백테스팅 진행
for pair, returns in expected_returns.items():
    predicted_returns = np.array([returns["Predicted"]])  # 예측 수익률
    actual_returns = np.array([returns["Actual"]])        # 실제 수익률

    capital = initial_capital
    holdings = 0  # 주식 보유량
    cash = capital  # 초기에는 모두 현금 보유
    capital_history = []  # 자본 기록

    for predicted, actual in zip(predicted_returns, actual_returns):
        if predicted > 0:  # 매수 신호
            investment = cash  # 모든 자본 투자
            cash = 0
            holdings += investment / (1 + transaction_fee)  # 거래 수수료 반영
        elif predicted < 0 and holdings > 0:  # 매도 신호
            cash += holdings * (1 - transaction_fee)  # 주식 매도 후 수익 반영
            holdings = 0
        # 현재 총 자본 = 현금 + 주식 가치
        current_value = cash + holdings * (1 + predicted)  # 실제 수익률 반영
        capital_history.append(current_value)

    # 최종 자본 계산
    final_capital = capital_history[-1] if capital_history else initial_capital

    # 결과 저장
    backtesting_results.append({
        "Combination": pair,
        "Final Capital": final_capital,
        "Profit/Loss (%)": ((final_capital - initial_capital) / initial_capital) * 100
    })

# 백테스팅 결과를 데이터프레임으로 변환
backtesting_df = pd.DataFrame(backtesting_results)

# 백테스팅 결과 출력
print("Backtesting Results:")
print(backtesting_df)

# CSV 파일로 저장
file_path = "/content/drive/MyDrive/KHUDA 금융/심화트랙/데이터/backtesting_results.csv"
backtesting_df.to_csv(file_path, index=False, encoding="utf-8-sig")
print(f"Backtesting results saved to {file_path}")


Backtesting Results:
      Combination   Final Capital  Profit/Loss (%)
0        건설 -> 건설  1000000.000000         0.000000
1     건설 -> 경기소비재  1000000.000000         0.000000
2      건설 -> 기계장비  1000000.000000         0.000000
3       건설 -> 반도체  1000000.000000         0.000000
4        건설 -> 보험   999321.361755        -0.067864
..            ...             ...              ...
127    헬스케어 -> 은행  1000000.000000         0.000000
128   헬스케어 -> 자동차   999796.570902        -0.020343
129    헬스케어 -> 증권  1000000.000000         0.000000
130    헬스케어 -> 철강  1000000.000000         0.000000
131  헬스케어 -> 헬스케어  1000000.000000         0.000000

[132 rows x 3 columns]
Backtesting results saved to /content/drive/MyDrive/KHUDA 금융/심화트랙/데이터/backtesting_results.csv


In [93]:
strategy_returns = np.diff(capital_history) / capital_history[:-1]
buy_and_hold_returns = np.diff(benchmark_capital) / benchmark_capital[:-1]

print(f"Strategy Average Return: {np.mean(strategy_returns):.4f}, Volatility: {np.std(strategy_returns):.4f}")
print(f"Buy-and-Hold Average Return: {np.mean(buy_and_hold_returns):.4f}, Volatility: {np.std(buy_and_hold_returns):.4f}")


Strategy Average Return: -0.0003, Volatility: 0.0169
Buy-and-Hold Average Return: 0.0005, Volatility: 0.0182
