In [13]:
#!pip install TA-Lib-Precompiled
#!pip install finance-datareader
#!pip install yfinance OpenDartReader pandas tensorflow gymnasium "stable-baselines3[extra]" backtrader matplotlib requests
# (참고) 만약 PyTorch 기반으로 RL Agent 등을 구현하려면 tensorflow 대신 아래 설치
#!pip install torch torchvision torchaudio

#아래는 위에 실행한 다음 설치
#!pip install numpy==1.26.4
# 하고 나서 세션 다시 시작 (런타임 재시작)

In [14]:
# 셀 2: Google Drive 마운트 (선택 사항)
import os
from google.colab import drive
drive.mount('/content/drive')

# 마운트 확인 및 기본 경로 설정 (Config 클래스에서 이 경로 사용)
if os.path.exists('/content/drive/MyDrive'):
    print("Google Drive 마운트 성공!")
    # 데이터를 저장할 기본 경로 설정 (원하는 경로로 수정 가능)
    colab_base_dir = "/content/drive/MyDrive/Colab_AI_Trading_System/"
    print(f"데이터 및 모델 저장 기본 경로: {colab_base_dir}")
else:
    print("Google Drive 마운트 실패 또는 경로를 찾을 수 없습니다.")
    print("임시 저장소 '/content/'를 사용합니다. 런타임 종료 시 데이터가 삭제됩니다.")
    colab_base_dir = "/content/" # 임시 경로

# Config 클래스에서 사용할 수 있도록 기본 경로를 전역 변수처럼 저장 (Config 셀에서 참조)
# 또는 Config 클래스 내부에서 직접 경로 확인 로직 사용 (제공된 코드 방식)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Google Drive 마운트 성공!
데이터 및 모델 저장 기본 경로: /content/drive/MyDrive/Colab_AI_Trading_System/


In [15]:
# 셀 3: Config 클래스 정의 및 설정 객체 생성
import os
from datetime import datetime

class Config:
    def __init__(self):
        # --- 기본 설정 ---
        self.PROJECT_NAME = "AI_Trading_System_Free"
        # Google Drive 마운트 여부에 따라 기본 경로 설정
        if os.path.exists('/content/drive/MyDrive'):
            # Google Drive 내 원하는 경로로 수정하세요.
            self.BASE_DIR = "/content/drive/MyDrive/Colab_AI_Trading_System/"
            print(f"Config: Google Drive 경로 사용: {self.BASE_DIR}")
        else:
            self.BASE_DIR = "/content/" # 임시 폴더 사용
            print("Config: Google Drive 미마운트. 임시 경로 사용: /content/")

        self.DATA_DIR = os.path.join(self.BASE_DIR, "trading_data")
        self.MODEL_DIR = os.path.join(self.BASE_DIR, "saved_models")
        self.LOG_DIR = os.path.join(self.BASE_DIR, "logs")
        self.DB_PATH = os.path.join(self.DATA_DIR, f"{self.PROJECT_NAME}.db")
        self.LOG_FILE = os.path.join(self.LOG_DIR, f"{self.PROJECT_NAME}.log")
        self.LOG_LEVEL = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL

        # --- API Keys ---
        # OpenDART API 키 (자신의 키로 교체 필요)
        self.DART_API_KEY = "6ff7b17753c904539da88d0c5f56f1a48221aaac" # <--- !!! 자신의 API 키로 반드시 교체하세요 !!!
        if self.DART_API_KEY == "YOUR_OPENDART_API_KEY":
             print("경고: OpenDART API 키가 설정되지 않았습니다. config.py 파일에서 실제 키로 교체해주세요. 재무 데이터 업데이트가 제한됩니다.")


        # --- 데이터 설정 ---
        # 예시 티커 (필요에 따라 수정)
        self.US_TICKERS = ['AAPL', 'MSFT', 'NVDA', 'GOOGL', 'AMZN']
        self.KR_TICKERS = ['005930', '000660', '035720', '051910', '005380'] # 삼성전자, SK하이닉스, 카카오, LG화학, 현대차
        self.TARGET_TICKERS = self.US_TICKERS + self.KR_TICKERS

        self.DATA_START_DATE = "2018-01-01"
        self.DATA_END_DATE = datetime.now().strftime('%Y-%m-%d')

        # 데이터 분할 기간 설정 (예시)
        self.TRAIN_START_DATE = "2018-01-01"
        self.TRAIN_END_DATE = "2021-12-31"
        self.VALIDATION_START_DATE = "2022-01-01"
        self.VALIDATION_END_DATE = "2022-12-31"
        self.TEST_START_DATE = "2023-01-01"
        self.TEST_END_DATE = self.DATA_END_DATE

        # 특징 공학 설정
        self.FEATURE_LOOKBACK_WINDOW = 60 # 일부 지표 계산 시 참고용 (실제 window는 지표별로 다름)
        self.USE_TECHNICAL_INDICATORS = True
        self.USE_FUNDAMENTAL_DATA = bool(self.DART_API_KEY != "YOUR_OPENDART_API_KEY") # API 키가 있을 때만 사용 시도
        self.USE_SENTIMENT_DATA = False # 감성 데이터 사용 여부 (추후 구현)
        # 사용할 기술적 지표 목록 (FeatureEngineer 에서 사용)
        self.TECHNICAL_INDICATORS = ['MA5', 'MA10', 'MA20', 'MA60', 'RSI14', 'MACD', 'MACD_signal', 'MACD_hist', 'BB_upper', 'BB_middle', 'BB_lower', 'ATR', 'STOCH_K', 'STOCH_D', 'CCI', 'ADX', 'OBV']
        # final_feature_list = ['MA5', 'RSI14', 'adj_close', ...] # 모델 학습에 실제 사용할 최종 특징 리스트 (Environment 등에서 정의 필요)

        # --- 강화학습 (RL) 환경 설정 ---
        self.ENV_WINDOW_SIZE = 30 # 에이전트가 관찰할 과거 데이터 기간 (bar 개수)
        self.ENV_INITIAL_BALANCE = 10000000 # 초기 자본금 (백테스팅에서도 사용 가능)
        self.ENV_POSITION_MAX_RATIO = 0.1 # 개별 종목 최대 투자 비율 (RiskManager 에서 사용)
        self.ENV_TRANSACTION_COST_PCT = 0.0015 # 거래 수수료 (편도)
        self.ENV_SLIPPAGE_PCT = 0.001 # 슬리피지

        # --- 강화학습 (RL) 에이전트 설정 ---
        self.AGENT_TYPE = "PPO" # 사용할 RL 알고리즘 (Stable Baselines3 기준)
        self.AGENT_POLICY = "MlpPolicy" # 사용할 정책 네트워크 구조

        # PPO 하이퍼파라미터 (Stable Baselines3 기준)
        self.PPO_N_STEPS = 2048
        self.PPO_BATCH_SIZE = 64
        self.PPO_N_EPOCHS = 10
        self.PPO_GAMMA = 0.99
        self.PPO_GAE_LAMBDA = 0.95
        self.PPO_CLIP_RANGE = 0.2
        self.PPO_ENT_COEF = 0.01
        self.PPO_VF_COEF = 0.5
        self.PPO_LEARNING_RATE = 3e-4
        self.PPO_MAX_GRAD_NORM = 0.5

        # DQN 하이퍼파라미터 (참고용, Stable-Baselines3 DQN 사용 시)
        self.DQN_BUFFER_SIZE = 10000
        self.DQN_LEARNING_RATE = 1e-4
        self.DQN_BATCH_SIZE = 32
        self.DQN_GAMMA = 0.99
        self.DQN_TAU = 1.0
        self.DQN_TRAIN_FREQ = 4
        self.DQN_GRADIENT_STEPS = 1
        self.DQN_LEARNING_STARTS = 1000
        self.DQN_EXPLORATION_FRACTION = 0.1
        self.DQN_EXPLORATION_FINAL_EPS = 0.05
        self.DQN_TARGET_UPDATE_INTERVAL = 1000

        # 총 학습 타임스텝 및 모델 저장 주기
        self.TOTAL_TRAINING_TIMESTEPS = 100000 # 10만 스텝 (테스트용, 실제로는 더 길게 설정)
        self.SAVE_MODEL_FREQ = 50000 # 모델 저장 빈도 (timesteps)

        # --- 백테스팅 설정 ---
        self.BACKTEST_INITIAL_CASH = self.ENV_INITIAL_BALANCE
        self.BACKTEST_COMMISSION_PCT = self.ENV_TRANSACTION_COST_PCT
        self.BACKTEST_SLIPPAGE_PCT = self.ENV_SLIPPAGE_PCT

        # --- 위험 관리 설정 (RiskManager 에서 사용) ---
        self.RISK_MAX_POSITION_RATIO = self.ENV_POSITION_MAX_RATIO # 환경 설정과 동일하게 사용
        self.RISK_STOP_LOSS_PCT = 0.05 # 고정 손절매 비율
        self.RISK_USE_TRAILING_STOP = True # 추적 손절매 사용 여부
        self.RISK_TRAILING_STOP_PCT = 0.07 # 추적 손절매 비율
        self.RISK_MAX_SYSTEM_DRAWDOWN = 0.20 # 시스템 전체 최대 손실률 (백테스터에서 활용 가능)
        self.RISK_USE_KELLY_CRITERION = False # 켈리 기준 포지션 사이징 사용 여부
        self.RISK_KELLY_FRACTION = 0.2 # 사용할 켈리 비율 (켈리 기준값 * 이 비율)
        self.RISK_KELLY_WIN_RATE = 0.55 # 예상 승률 (켈리 계산용)
        self.RISK_KELLY_PAYOFF_RATIO = 1.5 # 예상 손익비 (평균 수익 / 평균 손실)

        # --- 기타 ---
        self.RANDOM_SEED = 42 # 재현성을 위한 랜덤 시드

        # --- 디렉토리 생성 ---
        # BASE_DIR 이 설정된 이후에 디렉토리 경로 재정의 및 생성
        self.DATA_DIR = os.path.join(self.BASE_DIR, "trading_data")
        self.MODEL_DIR = os.path.join(self.BASE_DIR, "saved_models")
        self.LOG_DIR = os.path.join(self.BASE_DIR, "logs")
        self.DB_PATH = os.path.join(self.DATA_DIR, f"{self.PROJECT_NAME}.db")
        self.LOG_FILE = os.path.join(self.LOG_DIR, f"{self.PROJECT_NAME}.log")

        os.makedirs(self.DATA_DIR, exist_ok=True)
        os.makedirs(self.MODEL_DIR, exist_ok=True)
        os.makedirs(self.LOG_DIR, exist_ok=True)
        print(f"Directories created/checked: {self.DATA_DIR}, {self.MODEL_DIR}, {self.LOG_DIR}")

# 설정 객체 생성
cfg = Config()
print("Config object 'cfg' created successfully.")
print(f"Log level from cfg: {cfg.LOG_LEVEL}")

Config: Google Drive 경로 사용: /content/drive/MyDrive/Colab_AI_Trading_System/
Directories created/checked: /content/drive/MyDrive/Colab_AI_Trading_System/trading_data, /content/drive/MyDrive/Colab_AI_Trading_System/saved_models, /content/drive/MyDrive/Colab_AI_Trading_System/logs
Config object 'cfg' created successfully.
Log level from cfg: INFO


In [16]:
# 셀 4: Utils 함수 정의 및 초기 설정 실행
import logging
import time
import os
import random
import numpy as np
import pandas as pd # reduce_mem_usage 에서 사용
import tensorflow as tf # TensorFlow 시드 설정 위해 임포트 (PyTorch 사용 시 torch 임포트)
# import torch # PyTorch 사용 시

# 이 셀에서는 이전에 생성된 cfg 객체를 사용합니다.

def setup_logging(config):
    """로거 설정 함수 (config 객체 받도록 수정)"""
    project_name = config.PROJECT_NAME
    log_file = config.LOG_FILE
    log_level_str = config.LOG_LEVEL.upper() # 대문자로 변환
    log_dir = config.LOG_DIR

    # 로그 레벨 문자열을 logging 상수로 변환
    log_level = getattr(logging, log_level_str, logging.INFO) # 기본값 INFO

    logger = logging.getLogger(project_name)
    if logger.hasHandlers():
        logger.handlers.clear()

    logger.setLevel(log_level)
    log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    # 파일 핸들러 (로그 디렉토리 생성 확인 후)
    os.makedirs(log_dir, exist_ok=True) # 로그 디렉토리 생성 보장
    file_handler = logging.FileHandler(log_file, encoding='utf-8') # 인코딩 명시
    file_handler.setFormatter(log_formatter)
    logger.addHandler(file_handler)

    # 콘솔 핸들러
    stream_handler = logging.StreamHandler()
    stream_handler.setFormatter(log_formatter)
    logger.addHandler(stream_handler)

    print(f"Logger '{project_name}' setup complete. Level: {log_level_str}, File: {log_file}")
    return logger

# 로거 설정 실행 (cfg 객체 전달)
logger = setup_logging(cfg)
logger.info("Logger setup complete from utils cell.")

def timeit(func):
    """함수 실행 시간 측정 데코레이터"""
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        # logger 가 정의된 후에 사용 가능
        logger.debug(f"Function {func.__name__} executed in {end_time - start_time:.4f} seconds")
        return result
    return wrapper

def set_random_seed(seed_value): # 인자 이름 변경 (seed는 모듈 이름과 충돌 가능성)
    """재현성을 위한 랜덤 시드 설정"""
    random.seed(seed_value)
    np.random.seed(seed_value)
    # TensorFlow 시드 설정 (TensorFlow 사용 시)
    try:
        tf.random.set_seed(seed_value)
        # tf.config.experimental.enable_op_determinism() # TF 2.8+ 에서 결정적 연산 활성화 (성능 저하 가능성)
    except NameError:
        logger.debug("TensorFlow not imported or tf variable not defined. Skipping TF seed.")
    except Exception as e:
        logger.warning(f"Could not set TensorFlow seed: {e}")

    # PyTorch 시드 설정 (PyTorch 사용 시)
    # try:
    #     import torch
    #     torch.manual_seed(seed_value)
    #     if torch.cuda.is_available():
    #         torch.cuda.manual_seed_all(seed_value)
    #         # 결정적 연산을 위한 설정 (성능 저하 가능성)
    #         # torch.backends.cudnn.deterministic = True
    #         # torch.backends.cudnn.benchmark = False
    # except ImportError:
    #     logger.debug("PyTorch not installed. Skipping PyTorch seed.")
    # except Exception as e:
    #     logger.warning(f"Could not set PyTorch seed: {e}")

    os.environ['PYTHONHASHSEED'] = str(seed_value)
    # Stable Baselines3 내부 시드 설정은 에이전트 학습 시 별도 처리될 수 있음
    logger.info(f"Global random seeds set to: {seed_value}")

# 프로그램 시작 시 시드 설정 (cfg 객체 사용)
set_random_seed(cfg.RANDOM_SEED)

def reduce_mem_usage(df, verbose=True):
    """데이터프레임 메모리 사용량 줄이는 함수"""
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type in numerics:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                # np.iinfo 정보 사용
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                # np.finfo 정보 사용
                # float16은 정밀도 문제 발생 가능성 높아 float32 우선 고려
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    # float16 변환 주석 처리 (필요 시 사용)
                    # if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    #     df[col] = df[col].astype(np.float16)
                    # else:
                        df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64) # float32 범위 벗어나면 float64 유지
    end_mem = df.memory_usage().sum() / 1024**2
    if verbose:
        logger.info(f'Memory usage decreased from {start_mem:.2f} MB to {end_mem:.2f} MB ({100 * (start_mem - end_mem) / start_mem:.1f}% reduction)')
    return df

print("Utils functions defined and logger initialized.")

2025-04-29 07:23:18,984 - AI_Trading_System_Free - INFO - Logger setup complete from utils cell.
INFO:AI_Trading_System_Free:Logger setup complete from utils cell.
2025-04-29 07:23:18,993 - AI_Trading_System_Free - INFO - Global random seeds set to: 42
INFO:AI_Trading_System_Free:Global random seeds set to: 42


Logger 'AI_Trading_System_Free' setup complete. Level: INFO, File: /content/drive/MyDrive/Colab_AI_Trading_System/logs/AI_Trading_System_Free.log
Utils functions defined and logger initialized.


In [17]:
# 셀 5: DataManager 클래스 정의 (최종 수정본 - 생략 없음)
import sqlite3
import pandas as pd
import yfinance as yf
import FinanceDataReader as fdr
import OpenDartReader
import time
import requests
import random
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text

# 이 셀에서는 이전에 생성된 cfg, logger 객체와 utils 함수를 사용합니다.

# --- API 요청 재시도 데코레이터 ---
def retry_api_request(max_retries=3, delay=5, allowed_exceptions=(requests.exceptions.RequestException, TimeoutError, ConnectionError)):
    """API 요청 실패 시 재시도하는 데코레이터"""
    def decorator(func):
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_retries):
                try:
                    result = func(*args, **kwargs)
                    return result
                except allowed_exceptions as e:
                    last_exception = e
                    logger.warning(f"Attempt {attempt + 1}/{max_retries}: API error in {func.__name__}: {e}")
                    if attempt < max_retries - 1:
                        wait_time = delay * (2 ** attempt) + random.uniform(0, delay)
                        logger.info(f"Retrying {func.__name__} in {wait_time:.2f} seconds...")
                        time.sleep(wait_time)
                    else:
                        logger.error(f"API call {func.__name__} failed after {max_retries} attempts.")
                        return None
                except Exception as e:
                    logger.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error during {func.__name__}: {e}", exc_info=True)
                    last_exception = e
                    if attempt < max_retries - 1:
                        wait_time = delay * (2 ** attempt) + random.uniform(0, delay)
                        logger.info(f"Retrying {func.__name__} in {wait_time:.2f} seconds...")
                        time.sleep(wait_time)
                    else:
                        logger.error(f"API call {func.__name__} failed unexpectedly after {max_retries} attempts.")
                        return None
            return None
        return wrapper
    return decorator

class DataManager:
    def __init__(self, db_path=None):
        self.db_path = db_path if db_path else cfg.DB_PATH
        self.conn = None
        self.engine = None
        self._connect_db()
        self.dart = None
        if cfg.DART_API_KEY and cfg.DART_API_KEY != "YOUR_OPENDART_API_KEY":
            try:
                self.dart = OpenDartReader(cfg.DART_API_KEY)
                logger.info("OpenDARTReader initialized successfully.")
            except Exception as e:
                logger.error(f"Failed to initialize OpenDARTReader: {e}. Financial data fetching disabled.")
                self.dart = None
        else:
            logger.warning("DART API Key not provided or invalid. Financial data fetching disabled.")

    def _connect_db(self):
        """데이터베이스 연결 (SQLite 및 SQLAlchemy 엔진)"""
        try:
            self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
            logger.info(f"SQLite connection established: {self.db_path}")
            self.engine = create_engine(f'sqlite:///{self.db_path}')
            logger.info(f"SQLAlchemy engine created for: {self.db_path}")
            self._create_tables()
        except sqlite3.Error as e:
            logger.critical(f"SQLite connection failed: {e}")
            self.conn = None
        except Exception as e:
             logger.critical(f"SQLAlchemy engine creation failed: {e}")
             self.engine = None
             if self.conn:
                 try: self.conn.close()
                 except: pass
             self.conn = None

    def _create_tables(self):
        """데이터베이스 테이블 생성 (존재하지 않을 경우)"""
        if not self.conn: return
        try:
            with self.conn:
                # 주가 데이터 테이블
                self.conn.execute("""
                CREATE TABLE IF NOT EXISTS stock_prices (
                    date TEXT NOT NULL, ticker TEXT NOT NULL,
                    open REAL, high REAL, low REAL, close REAL,
                    adj_close REAL, volume INTEGER, source TEXT,
                    PRIMARY KEY (date, ticker)
                )""")
                # 재무 데이터 테이블 (Primary Key 주의)
                self.conn.execute("""
                CREATE TABLE IF NOT EXISTS financial_statements (
                    ticker TEXT NOT NULL, year INTEGER NOT NULL, quarter INTEGER NOT NULL,
                    fs_div TEXT, report_code TEXT, account_id TEXT,
                    account_name TEXT NOT NULL, account_value REAL, currency TEXT,
                    PRIMARY KEY (ticker, year, quarter, fs_div, account_name)
                )""")
            logger.info("Database tables checked/created successfully.")
        except sqlite3.Error as e:
            logger.error(f"Failed to create database tables: {e}")

    def _get_last_date_from_db(self, ticker):
        """특정 종목의 DB 내 마지막 데이터 날짜 조회"""
        if not self.conn: return None
        query = "SELECT MAX(date) FROM stock_prices WHERE ticker = ?"
        try:
            cursor = self.conn.cursor()
            cursor.execute(query, (ticker,))
            result = cursor.fetchone()
            last_date = result[0] if result and result[0] else None
            return last_date
        except sqlite3.Error as e:
            logger.error(f"Error fetching last date for {ticker}: {e}")
            return None

    @retry_api_request()
    def _fetch_yf_data(self, ticker, start_date, end_date):
        """yfinance 데이터 로드 함수"""
        logger.debug(f"Fetching yfinance data for {ticker} from {start_date} to {end_date}")
        try:
            end_date_dt = datetime.strptime(end_date, '%Y-%m-%d')
            end_date_yf = (end_date_dt + timedelta(days=1)).strftime('%Y-%m-%d')
        except ValueError:
             logger.error(f"Invalid end_date format for yfinance: {end_date}. Using original.")
             end_date_yf = end_date
        df = yf.download(ticker, start=start_date, end=end_date_yf, progress=False, auto_adjust=False, actions=False)
        if isinstance(df.columns, pd.MultiIndex):
            df.columns = df.columns.get_level_values(0)
        if 'Adj Close' not in df.columns and 'Close' in df.columns:
             df['Adj Close'] = df['Close']
        return df

    @retry_api_request()
    def _fetch_fdr_data(self, ticker, start_date, end_date):
        """FinanceDataReader 데이터 로드 함수"""
        logger.debug(f"Fetching FDR data for {ticker} from {start_date} to {end_date}")
        df = fdr.DataReader(ticker, start=start_date, end=end_date)
        if 'Adj Close' not in df.columns and 'Close' in df.columns:
            df['Adj Close'] = df['Close']
        return df

    @timeit
    def update_stock_prices(self, tickers=None, start_date=None, end_date=None):
        """모든 대상 종목의 주가 데이터를 최신 상태로 업데이트"""
        # (이 메서드의 전체 로직은 생략 없이 모두 포함됩니다 - 이전과 동일)
        if not self.conn or not self.engine:
            logger.error("Database connection/engine is not available. Cannot update stock prices.")
            return
        target_tickers = tickers if tickers else cfg.TARGET_TICKERS
        start_date = start_date if start_date else cfg.DATA_START_DATE
        end_date = end_date if end_date else cfg.DATA_END_DATE
        logger.info(f"Updating stock prices for {len(target_tickers)} tickers ({start_date} to {end_date})...")
        total_rows_added = 0
        failed_tickers = []
        for ticker in target_tickers:
            last_db_date_str = self._get_last_date_from_db(ticker)
            fetch_start_date = start_date
            if last_db_date_str:
                try:
                    last_db_date = datetime.strptime(last_db_date_str, '%Y-%m-%d')
                    fetch_start_date = (last_db_date + timedelta(days=1)).strftime('%Y-%m-%d')
                except ValueError:
                     logger.warning(f"Invalid date format '{last_db_date_str}' in DB for {ticker}. Fetching from {start_date}.")
                     fetch_start_date = start_date
            if fetch_start_date > end_date:
                logger.info(f"Data for {ticker} is already up to date (Last: {last_db_date_str}). Skipping.")
                continue
            logger.debug(f"Fetching data for {ticker} from {fetch_start_date} to {end_date}")
            df = None
            source = None
            try:
                if len(ticker) == 6 and ticker.isdigit():
                    df = self._fetch_fdr_data(ticker, fetch_start_date, end_date)
                    source = 'fdr'
                else:
                    df = self._fetch_yf_data(ticker, fetch_start_date, end_date)
                    source = 'yfinance'
                if df is None or df.empty:
                    logger.warning(f"No data fetched for {ticker} from {fetch_start_date} to {end_date} using {source}.")
                    time.sleep(random.uniform(0.5, 1.5))
                    continue
                df = df.reset_index()
                df['ticker'] = ticker
                df['source'] = source
                rename_map = {'Date': 'date', 'Datetime': 'date', 'index': 'date','Open': 'open','High': 'high','Low': 'low','Close': 'close','Adj Close': 'adj_close','Volume': 'volume'}
                df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns}, inplace=True)
                if 'date' not in df.columns: logger.error(f"Date column missing for {ticker}. Columns: {df.columns}. Skipping."); continue
                df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')
                if 'adj_close' not in df.columns:
                    if 'close' in df.columns: df['adj_close'] = df['close']
                    else: logger.error(f"Both 'adj_close' and 'close' missing for {ticker}. Skipping."); continue
                final_cols = ['date', 'ticker', 'open', 'high', 'low', 'close', 'adj_close', 'volume', 'source']
                missing_cols = [col for col in final_cols if col not in df.columns and col not in ['open','high','low','volume']]
                if missing_cols: logger.error(f"Missing required columns for {ticker}: {missing_cols}. Skipping."); continue
                if not all(c in df.columns for c in ['open','high','low','volume']): logger.warning(f"OHLV columns partially missing for {ticker}.")
                numeric_cols = ['open', 'high', 'low', 'close', 'adj_close', 'volume']
                for col in numeric_cols:
                    if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce')
                df.dropna(subset=[col for col in numeric_cols if col in df.columns], inplace=True)
                df = df.fillna(0)
                df = df[[col for col in final_cols if col in df.columns]]
                df = reduce_mem_usage(df, verbose=False)
                if not df.empty:
                    try:
                        with self.engine.connect() as connection:
                            with connection.begin():
                                df.to_sql('stock_prices', con=connection, if_exists='append', index=False, chunksize=1000, method=lambda table, conn, keys, data_iter: self._pandas_upsert(table, conn, keys, data_iter, pkey=['date', 'ticker']))
                        rows_added = len(df)
                        total_rows_added += rows_added
                        logger.debug(f"Upserted {rows_added} rows for {ticker} into stock_prices.")
                    except sqlite3.Error as e_sql: logger.error(f"Failed to save stock price data for {ticker} to DB: {e_sql}"); failed_tickers.append(ticker)
                    except Exception as e_exc: logger.error(f"Unexpected error saving stock price data for {ticker}: {e_exc}", exc_info=True); failed_tickers.append(ticker)
                else: logger.debug(f"No valid data rows to save for {ticker} after cleaning.")
                time.sleep(random.uniform(0.5, 1.5))
            except Exception as e:
                logger.error(f"Failed processing/fetching data for {ticker}: {e}", exc_info=True)
                failed_tickers.append(ticker)
                time.sleep(random.uniform(1.0, 2.0))
                continue
        logger.info(f"Stock price update completed. Total new/updated rows: {total_rows_added}")
        if failed_tickers: logger.warning(f"Failed to process/update data for {len(failed_tickers)} tickers: {failed_tickers}")

    def _pandas_upsert(self, table, conn, keys, data_iter, pkey):
        """pandas to_sql 'append' 시 SQLite에서 Upsert (INSERT OR REPLACE) 를 수행하는 메서드."""
        # (이 메서드의 전체 로직은 생략 없이 모두 포함됩니다 - 이전과 동일)
        dbapi_conn = conn.connection
        cursor = dbapi_conn.cursor()
        cols = ', '.join(f'"{k}"' for k in keys)
        placeholders = ', '.join('?' for _ in keys)
        sql = f'INSERT OR REPLACE INTO "{table.name}" ({cols}) VALUES ({placeholders})'
        try:
            cursor.executemany(sql, data_iter)
        except sqlite3.Error as e:
            logger.error(f"Error during upsert executemany for table {table.name}: {e}")
            raise

    @retry_api_request(allowed_exceptions=(requests.exceptions.RequestException, TimeoutError, Exception))
    def _fetch_dart_finstate(self, ticker, year, report_code, fs_div): # 인자는 유지, 실제 API 호출엔 year만 사용
        """DART 재무제표 조회 함수 (finstate_all 사용)"""
        # (이 메서드의 전체 로직은 생략 없이 모두 포함됩니다 - 이전과 동일)
        if not self.dart: return None
        logger.debug(f"Fetching DART finstate_all: ticker={ticker}, year={year} (ignoring report_code/fs_div for API call)")
        try:
            df = self.dart.finstate_all(str(ticker), int(year)) # year는 정수로 변환
            if df is None: return pd.DataFrame()
            return df
        except Exception as e:
            if isinstance(e, TypeError) and "not supported between instances of 'str' and 'int'" in str(e):
                 logger.error(f"TypeError in dart.finstate_all(ticker={ticker}, year={year}). Check year type conversion. Error: {e}", exc_info=True)
            else:
                 logger.error(f"Error calling dart.finstate_all(ticker={ticker}, year={year}): {e}", exc_info=True)
            return pd.DataFrame()

    # ===================================================================
    # === update_financials 메서드 (연간 데이터 처리 방식으로 수정됨) ===
    # ===================================================================
    @timeit
    def update_financials(self, tickers=None, start_year=None, end_year=None):
        """한국 기업의 **연간** 재무 데이터를 최신 상태로 업데이트합니다."""
        if not self.conn or not self.engine or not self.dart:
            logger.warning("DB connection/engine or DART API not available. Skipping annual financial update.")
            return 0

        target_tickers = tickers if tickers else cfg.KR_TICKERS
        kr_tickers_to_update = [t for t in target_tickers if len(t) == 6 and t.isdigit()]
        if not kr_tickers_to_update:
            logger.info("No valid KR tickers found to update annual financials.")
            return 0

        start_y = start_year if start_year else int(cfg.DATA_START_DATE[:4])
        end_y = end_year if end_year else datetime.now().year

        logger.info(f"Updating **annual** financial statements for {len(kr_tickers_to_update)} KR tickers ({start_y} to {end_y})...")
        # --- !! total_rows_added 초기화 위치 !! ---
        total_rows_added = 0
        failed_fetches = []

        annual_report_code = '11011'
        annual_quarter_value = 4 # 연간 데이터를 대표하는 분기 값

        for ticker in kr_tickers_to_update:
            for year in range(start_y, end_y + 1):
                # --- !! 아래 logger.debug 라인이 연도 루프 시작과 같은 들여쓰기 레벨 !! ---
                logger.debug(f"Processing **annual** data for {ticker}, Year: {year}")
                processed_fs = False # 해당 연도 처리 성공 플래그

                raw_df = None
                try:
                    # --- 데이터 조회 (연도별로 한 번만) ---
                    raw_df = self._fetch_dart_finstate(ticker, year, annual_report_code, 'CFS') # fs_div는 형식상

                    if raw_df is None or raw_df.empty:
                         logger.warning(f"No data returned from _fetch_dart_finstate for {ticker} Y{year}.")
                         if (ticker, year) not in failed_fetches: failed_fetches.append((ticker, year))
                         # 연도 루프 마지막에 time.sleep 이 있으므로 여기서는 continue
                         continue

                    # --- 디버깅 출력 ---
                    # print(f"--- Raw DataFrame from finstate_all for {ticker} Y{year} (before filtering/processing) ---")
                    # print(raw_df.head())
                    # print(f"Columns: {raw_df.columns.tolist()}")
                    # print("---------------------------------------------------------------------")

                    # --- CFS, OFS 순으로 처리 시도 ---
                    for fs_div_val in ['CFS', 'OFS']:
                        logger.debug(f"Attempting to filter and process {fs_div_val} data for {ticker} Y{year}")

                        # --- 연간 보고서 데이터 필터링 ('reprt_code' 기준) ---
                        if 'reprt_code' in raw_df.columns:
                            fs_df = raw_df[raw_df['reprt_code'] == annual_report_code].copy()
                            # logger.debug(f"Filtered {len(fs_df)} rows for report_code={annual_report_code}")
                        else:
                            fs_df = pd.DataFrame() # 필터링 불가 시 빈 프레임

                        # === 데이터 처리 (인라인) ===
                        if not fs_df.empty:
                            logger.info(f"Annual data filtered for {ticker} Y{year} (Processing as {fs_div_val})...")

                            # 컬럼 추가/변경 (기존 로직 재활용, quarter/report_code 고정)
                            fs_df['ticker'] = ticker
                            fs_df['year'] = year
                            fs_df['quarter'] = annual_quarter_value # 연간 대표값
                            fs_df['fs_div'] = fs_div_val # CFS 또는 OFS
                            fs_df['report_code'] = annual_report_code # '11011'
                            fs_df['currency'] = 'KRW'

                            rename_fin_map = {
                                'corp_code': 'corp_code_ignore', 'bsns_year': 'bsns_year_ignore',
                                'reprt_code': 'report_code_ignore', 'fs_div': 'fs_div_ignore',
                                'sj_div': 'statement_type', 'account_id': 'account_id',
                                'account_nm': 'account_name', 'thstrm_amount': 'account_value'
                            }
                            fs_df.rename(columns={k: v for k, v in rename_fin_map.items() if k in fs_df.columns}, inplace=True)

                            # 값 정리
                            if 'account_value' in fs_df.columns:
                                fs_df['account_value'] = fs_df['account_value'].astype(str).str.replace(',', '', regex=False).replace('', '0', regex=False)
                                fs_df['account_value'] = pd.to_numeric(fs_df['account_value'], errors='coerce')
                            else:
                                logger.warning(f"'account_value' not found for {ticker} Y{year} ({fs_div_val}). Skip.")
                                continue # 현재 fs_div 시도 중단

                            # 최종 컬럼 선택 및 NaN 처리
                            final_fin_cols = ['ticker', 'year', 'quarter', 'fs_div', 'report_code', 'account_id', 'account_name', 'account_value', 'currency']
                            if 'account_id' not in fs_df.columns: fs_df['account_id'] = None
                            if 'account_name' not in fs_df.columns:
                                 logger.warning(f"'account_name' missing for {ticker} Y{year} ({fs_div_val}). Skip.")
                                 continue
                            fs_df = fs_df[[col for col in final_fin_cols if col in fs_df.columns]]
                            fs_df.dropna(subset=['account_value'])

                            # === 데이터 저장 (인라인) ===
                            if not fs_df.empty:
                                try:
                                    with self.engine.connect() as connection:
                                        with connection.begin():
                                            fs_df.to_sql(
                                                'financial_statements', con=connection, if_exists='append', index=False, chunksize=1000,
                                                # !! DB 기본키와 이 pkey 리스트가 일치해야 함 (quarter=4 포함) !!
                                                method=lambda table, conn, keys, data_iter: self._pandas_upsert(table, conn, keys, data_iter, pkey=['ticker', 'year', 'quarter', 'fs_div', 'account_name'])
                                            )
                                    rows_added = len(fs_df)
                                    total_rows_added += rows_added # <- total_rows_added 업데이트
                                    logger.info(f"Upserted {rows_added} annual financial rows for {ticker} Y{year} ({fs_div_val}).")
                                    processed_fs = True # 성공 플래그
                                    break # CFS/OFS 중 하나 성공 시 내부 루프 탈출

                                except sqlite3.Error as e_sql: logger.error(f"DB Error saving for {ticker} Y{year} ({fs_div_val}): {e_sql}")
                                except Exception as e_exc: logger.error(f"Unexpected Error saving for {ticker} Y{year} ({fs_div_val}): {e_exc}", exc_info=True)
                            else: logger.debug(f"No valid rows to save for {ticker} Y{year} ({fs_div_val}).")
                        else: logger.debug(f"No rows after filtering for {ticker} Y{year} (attempting {fs_div_val}).")

                        # CFS 처리 성공 시 OFS 시도 안 함
                        if processed_fs:
                            break

                except Exception as e: # _fetch_dart_finstate 또는 필터링/처리 중 예외
                    logger.error(f"Outer error processing DART data for {ticker} Y{year}: {e}", exc_info=True)
                    if (ticker, year) not in failed_fetches: failed_fetches.append((ticker, year))
                    # 오류 발생 시 현재 연도 처리는 중단하고 다음 연도로 (아래 time.sleep 후)

                # --- 연도 루프 끝 부분 ---
                if not processed_fs: # CFS, OFS 모두 시도 후에도 성공 못한 경우
                    logger.warning(f"No annual financial data processed/saved for {ticker} Y{year} after trying CFS/OFS.")
                    if (ticker, year) not in failed_fetches:
                        failed_fetches.append((ticker, year))

                # 연도별 루프 API 호출 간격 (오류 발생 여부와 관계없이 실행)
                time.sleep(random.uniform(0.8, 1.8))
            # --- 연도 루프 종료 ---
        # --- 티커 루프 종료 ---

        # ===================================================================
        # === !!! 중요: 아래 로그 라인들의 들여쓰기 확인 !!! ===
        # === 이 라인들은 for ticker 루프 종료 후, 메서드 종료 전에 위치해야 함 ===
        # ===================================================================
        logger.info(f"**Annual** financial statement update completed. Total new/updated rows: {total_rows_added}")
        if failed_fetches:
            logger.warning(f"Failed to fetch/process **annual** financial data for {len(failed_fetches)} ticker-years: {failed_fetches}")

        return total_rows_added # 처리된 행 수 반환
    # ===================================================================
    # === update_financials 메서드 정의 끝 ===
    # ===================================================================

    def get_data(self, table_name, tickers=None, start_date=None, end_date=None, start_year=None, end_year=None, other_conditions=""):
        """DB에서 데이터를 조회하여 DataFrame으로 반환"""
        # (이 메서드의 전체 로직은 생략 없이 모두 포함됩니다 - 이전과 동일)
        if not self.conn: logger.error("Database connection is not available."); return pd.DataFrame()
        query = f"SELECT * FROM {table_name}"
        conditions = []; params = []
        if tickers:
            if isinstance(tickers, str): tickers = [tickers]
            if len(tickers) > 0: placeholders = ', '.join('?' for _ in tickers); conditions.append(f"ticker IN ({placeholders})"); params.extend(tickers)
        if table_name == 'stock_prices':
            if start_date: conditions.append("date >= ?"); params.append(start_date)
            if end_date: conditions.append("date <= ?"); params.append(end_date)
        elif table_name == 'financial_statements':
            yr_start = start_year if start_year else (int(start_date[:4]) if start_date else None)
            yr_end = end_year if end_year else (int(end_date[:4]) if end_date else None)
            if yr_start: conditions.append("year >= ?"); params.append(yr_start)
            if yr_end: conditions.append("year <= ?"); params.append(yr_end)
        if other_conditions: conditions.append(other_conditions)
        if conditions: query += " WHERE " + " AND ".join(conditions)
        if table_name == 'stock_prices': query += " ORDER BY ticker ASC, date ASC"
        elif table_name == 'financial_statements': query += " ORDER BY ticker ASC, year ASC, quarter ASC, fs_div ASC"
        logger.debug(f"Executing SQL query: {query} with params: {params}")
        try:
            df = pd.read_sql_query(query, self.conn, params=params)
            logger.info(f"Loaded {len(df)} rows from table '{table_name}' for tickers: {tickers}")
            if not df.empty:
                if 'date' in df.columns and table_name == 'stock_prices': df['date'] = pd.to_datetime(df['date']); df = df.set_index('date')
                df = reduce_mem_usage(df)
            return df
        except Exception as e:
            logger.error(f"Failed to load data from table '{table_name}': {e}", exc_info=True)
            return pd.DataFrame()

    def close_connection(self):
        """DB 연결 종료"""
        # (이 메서드의 전체 로직은 생략 없이 모두 포함됩니다 - 이전과 동일)
        if self.conn:
            try: self.conn.close(); logger.info("SQLite connection closed.")
            except Exception as e: logger.error(f"Error closing SQLite connection: {e}")
            finally: self.conn = None
        if self.engine:
            try: self.engine.dispose(); logger.info("SQLAlchemy engine disposed.")
            except Exception as e: logger.error(f"Error disposing SQLAlchemy engine: {e}")
            finally: self.engine = None

print("DataManager class defined.")

DataManager class defined.


In [18]:
# 셀 6: 데이터 로딩/업데이트 실행 (Optional)

# 이 셀을 실행하면 설정된 기간과 티커에 대해 데이터 다운로드 및 저장을 시작합니다.
# 시간이 오래 걸릴 수 있으며, API 요청 제한에 유의해야 합니다.

logger.info("--- 데이터 로딩/업데이트 시작 ---")
# DataManager 인스턴스 생성 (기존 인스턴스 사용 또는 새로 생성)
# dm 변수가 이전 셀에서 정의되었는지 확인
try:
    if 'dm' not in locals() or dm is None or dm.conn is None:
         logger.info("DataManager 인스턴스를 새로 생성합니다.")
         dm = DataManager()
    else:
         logger.info("기존 DataManager 인스턴스를 사용합니다.")

    # DB 연결 확인
    if dm.conn and dm.engine:
        # 1. 주가 데이터 업데이트 (전체 기간 업데이트)
        dm.update_stock_prices(tickers=cfg.TARGET_TICKERS, start_date=cfg.DATA_START_DATE, end_date=cfg.DATA_END_DATE)

        # 2. 재무 데이터 업데이트 (한국 주식 대상)
        kr_tickers_to_update = [t for t in cfg.TARGET_TICKERS if len(t) == 6 and t.isdigit()]
        if cfg.DART_API_KEY and cfg.DART_API_KEY != "YOUR_OPENDART_API_KEY" and kr_tickers_to_update:
             start_y = int(cfg.DATA_START_DATE[:4])
             end_y = int(cfg.DATA_END_DATE[:4])
             dm.update_financials(tickers=kr_tickers_to_update, start_year=start_y, end_year=end_y)
        else:
             logger.warning("재무 데이터 업데이트 건너뜀 (API 키 부재 또는 한국 주식 없음).")

        # 3. 데이터 조회 예시 (결과 확인용)
        logger.info("--- DB에서 샘플 데이터 로드 ---")
        sample_ticker = cfg.TARGET_TICKERS[0] if cfg.TARGET_TICKERS else None
        if sample_ticker:
             # 최근 1년치 데이터 조회
             end_dt_str = cfg.DATA_END_DATE
             start_dt_str = (datetime.strptime(end_dt_str, '%Y-%m-%d') - timedelta(days=365)).strftime('%Y-%m-%d')

             stock_data_sample = dm.get_data('stock_prices', tickers=sample_ticker, start_date=start_dt_str, end_date=end_dt_str)
             if not stock_data_sample.empty:
                 print(f"\n{sample_ticker} 주가 데이터 샘플 (최근 5일):")
                 print(stock_data_sample.tail())
             else:
                 print(f"\n{sample_ticker} 주가 데이터 샘플을 로드하지 못했습니다.")

             if sample_ticker in kr_tickers_to_update:
                  fin_data_sample = dm.get_data('financial_statements', tickers=sample_ticker, start_year=end_y-1, end_year=end_y) # 최근 2년치
                  if not fin_data_sample.empty:
                      print(f"\n{sample_ticker} 재무 데이터 샘플 (최근 항목):")
                      print(fin_data_sample.tail())
                  else:
                      print(f"\n{sample_ticker} 재무 데이터 샘플을 로드하지 못했습니다.")

        # 데이터베이스 연결 종료 (필요 시)
        # dm.close_connection()
        # logger.info("데이터 로딩/업데이트 완료 후 DB 연결이 유지됩니다 (다른 셀에서 사용 가능).")

    else:
        logger.error("DataManager 초기화 실패. 데이터 로딩/업데이트를 진행할 수 없습니다.")

except NameError as ne:
     logger.error(f"필요한 객체(dm, cfg, logger 등)가 정의되지 않았습니다: {ne}")
except Exception as e:
     logger.error(f"데이터 로딩/업데이트 중 오류 발생: {e}", exc_info=True)

logger.info("--- 데이터 로딩/업데이트 프로세스 종료 ---")

2025-04-29 07:23:19,102 - AI_Trading_System_Free - INFO - --- 데이터 로딩/업데이트 시작 ---
INFO:AI_Trading_System_Free:--- 데이터 로딩/업데이트 시작 ---
2025-04-29 07:23:19,106 - AI_Trading_System_Free - INFO - 기존 DataManager 인스턴스를 사용합니다.
INFO:AI_Trading_System_Free:기존 DataManager 인스턴스를 사용합니다.
2025-04-29 07:23:19,109 - AI_Trading_System_Free - INFO - Updating stock prices for 10 tickers (2018-01-01 to 2025-04-29)...
INFO:AI_Trading_System_Free:Updating stock prices for 10 tickers (2018-01-01 to 2025-04-29)...
2025-04-29 07:23:24,221 - AI_Trading_System_Free - INFO - Data for 005930 is already up to date (Last: 2025-04-29). Skipping.
INFO:AI_Trading_System_Free:Data for 005930 is already up to date (Last: 2025-04-29). Skipping.
2025-04-29 07:23:24,224 - AI_Trading_System_Free - INFO - Data for 000660 is already up to date (Last: 2025-04-29). Skipping.
INFO:AI_Trading_System_Free:Data for 000660 is already up to date (Last: 2025-04-29). Skipping.
2025-04-29 07:23:24,227 - AI_Trading_System_Free - INFO - Data

reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:23:31,132 - AI_Trading_System_Free - INFO - Annual data filtered for 005930 Y2018 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005930 Y2018 (Processing as CFS)...
2025-04-29 07:23:31,205 - AI_Trading_System_Free - INFO - Upserted 218 annual financial rows for 005930 Y2018 (CFS).
INFO:AI_Trading_System_Free:Upserted 218 annual financial rows for 005930 Y2018 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:23:39,383 - AI_Trading_System_Free - INFO - Annual data filtered for 005930 Y2019 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005930 Y2019 (Processing as CFS)...
2025-04-29 07:23:39,431 - AI_Trading_System_Free - INFO - Upserted 210 annual financial rows for 005930 Y2019 (CFS).
INFO:AI_Trading_System_Free:Upserted 210 annual financial rows for 005930 Y2019 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:23:47,861 - AI_Trading_System_Free - INFO - Annual data filtered for 005930 Y2020 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005930 Y2020 (Processing as CFS)...
2025-04-29 07:23:47,911 - AI_Trading_System_Free - INFO - Upserted 209 annual financial rows for 005930 Y2020 (CFS).
INFO:AI_Trading_System_Free:Upserted 209 annual financial rows for 005930 Y2020 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:23:55,129 - AI_Trading_System_Free - INFO - Annual data filtered for 005930 Y2021 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005930 Y2021 (Processing as CFS)...
2025-04-29 07:23:55,235 - AI_Trading_System_Free - INFO - Upserted 186 annual financial rows for 005930 Y2021 (CFS).
INFO:AI_Trading_System_Free:Upserted 186 annual financial rows for 005930 Y2021 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:24:02,469 - AI_Trading_System_Free - INFO - Annual data filtered for 005930 Y2022 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005930 Y2022 (Processing as CFS)...
2025-04-29 07:24:02,514 - AI_Trading_System_Free - INFO - Upserted 185 annual financial rows for 005930 Y2022 (CFS).
INFO:AI_Trading_System_Free:Upserted 185 annual financial rows for 005930 Y2022 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:24:09,403 - AI_Trading_System_Free - INFO - Annual data filtered for 005930 Y2023 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005930 Y2023 (Processing as CFS)...
2025-04-29 07:24:09,444 - AI_Trading_System_Free - INFO - Upserted 176 annual financial rows for 005930 Y2023 (CFS).
INFO:AI_Trading_System_Free:Upserted 176 annual financial rows for 005930 Y2023 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:24:17,544 - AI_Trading_System_Free - INFO - Annual data filtered for 005930 Y2024 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005930 Y2024 (Processing as CFS)...
2025-04-29 07:24:17,588 - AI_Trading_System_Free - INFO - Upserted 213 annual financial rows for 005930 Y2024 (CFS).
INFO:AI_Trading_System_Free:Upserted 213 annual financial rows for 005930 Y2024 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'




{'status': '013', 'message': '조회된 데이타가 없습니다.'}

reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:24:24,757 - AI_Trading_System_Free - INFO - Annual data filtered for 000660 Y2018 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 000660 Y2018 (Processing as CFS)...
2025-04-29 07:24:24,801 - AI_Trading_System_Free - INFO - Upserted 152 annual financial rows for 000660 Y2018 (CFS).
INFO:AI_Trading_System_Free:Upserted 152 annual financial rows for 000660 Y2018 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:24:30,988 - AI_Trading_System_Free - INFO - Annual data filtered for 000660 Y2019 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 000660 Y2019 (Processing as CFS)...
2025-04-29 07:24:31,068 - AI_Trading_System_Free - INFO - Upserted 162 annual financial rows for 000660 Y2019 (CFS).
INFO:AI_Trading_System_Free:Upserted 162 annual financial rows for 000660 Y2019 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:24:37,207 - AI_Trading_System_Free - INFO - Annual data filtered for 000660 Y2020 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 000660 Y2020 (Processing as CFS)...
2025-04-29 07:24:37,253 - AI_Trading_System_Free - INFO - Upserted 158 annual financial rows for 000660 Y2020 (CFS).
INFO:AI_Trading_System_Free:Upserted 158 annual financial rows for 000660 Y2020 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:24:43,917 - AI_Trading_System_Free - INFO - Annual data filtered for 000660 Y2021 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 000660 Y2021 (Processing as CFS)...
2025-04-29 07:24:43,970 - AI_Trading_System_Free - INFO - Upserted 161 annual financial rows for 000660 Y2021 (CFS).
INFO:AI_Trading_System_Free:Upserted 161 annual financial rows for 000660 Y2021 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:24:50,651 - AI_Trading_System_Free - INFO - Annual data filtered for 000660 Y2022 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 000660 Y2022 (Processing as CFS)...
2025-04-29 07:24:50,696 - AI_Trading_System_Free - INFO - Upserted 165 annual financial rows for 000660 Y2022 (CFS).
INFO:AI_Trading_System_Free:Upserted 165 annual financial rows for 000660 Y2022 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:24:57,418 - AI_Trading_System_Free - INFO - Annual data filtered for 000660 Y2023 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 000660 Y2023 (Processing as CFS)...
2025-04-29 07:24:57,463 - AI_Trading_System_Free - INFO - Upserted 171 annual financial rows for 000660 Y2023 (CFS).
INFO:AI_Trading_System_Free:Upserted 171 annual financial rows for 000660 Y2023 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:25:06,818 - AI_Trading_System_Free - INFO - Annual data filtered for 000660 Y2024 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 000660 Y2024 (Processing as CFS)...
2025-04-29 07:25:06,926 - AI_Trading_System_Free - INFO - Upserted 236 annual financial rows for 000660 Y2024 (CFS).
INFO:AI_Trading_System_Free:Upserted 236 annual financial rows for 000660 Y2024 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'




{'status': '013', 'message': '조회된 데이타가 없습니다.'}

reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:25:15,195 - AI_Trading_System_Free - INFO - Annual data filtered for 035720 Y2018 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 035720 Y2018 (Processing as CFS)...
2025-04-29 07:25:15,243 - AI_Trading_System_Free - INFO - Upserted 181 annual financial rows for 035720 Y2018 (CFS).
INFO:AI_Trading_System_Free:Upserted 181 annual financial rows for 035720 Y2018 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:25:22,229 - AI_Trading_System_Free - INFO - Annual data filtered for 035720 Y2019 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 035720 Y2019 (Processing as CFS)...
2025-04-29 07:25:22,268 - AI_Trading_System_Free - INFO - Upserted 193 annual financial rows for 035720 Y2019 (CFS).
INFO:AI_Trading_System_Free:Upserted 193 annual financial rows for 035720 Y2019 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:25:31,929 - AI_Trading_System_Free - INFO - Annual data filtered for 035720 Y2020 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 035720 Y2020 (Processing as CFS)...
2025-04-29 07:25:31,981 - AI_Trading_System_Free - INFO - Upserted 250 annual financial rows for 035720 Y2020 (CFS).
INFO:AI_Trading_System_Free:Upserted 250 annual financial rows for 035720 Y2020 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:25:41,006 - AI_Trading_System_Free - INFO - Annual data filtered for 035720 Y2021 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 035720 Y2021 (Processing as CFS)...
2025-04-29 07:25:41,058 - AI_Trading_System_Free - INFO - Upserted 234 annual financial rows for 035720 Y2021 (CFS).
INFO:AI_Trading_System_Free:Upserted 234 annual financial rows for 035720 Y2021 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:25:50,562 - AI_Trading_System_Free - INFO - Annual data filtered for 035720 Y2022 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 035720 Y2022 (Processing as CFS)...
2025-04-29 07:25:50,604 - AI_Trading_System_Free - INFO - Upserted 265 annual financial rows for 035720 Y2022 (CFS).
INFO:AI_Trading_System_Free:Upserted 265 annual financial rows for 035720 Y2022 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:26:01,812 - AI_Trading_System_Free - INFO - Annual data filtered for 035720 Y2023 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 035720 Y2023 (Processing as CFS)...
2025-04-29 07:26:01,859 - AI_Trading_System_Free - INFO - Upserted 314 annual financial rows for 035720 Y2023 (CFS).
INFO:AI_Trading_System_Free:Upserted 314 annual financial rows for 035720 Y2023 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:26:11,323 - AI_Trading_System_Free - INFO - Annual data filtered for 035720 Y2024 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 035720 Y2024 (Processing as CFS)...
2025-04-29 07:26:11,368 - AI_Trading_System_Free - INFO - Upserted 231 annual financial rows for 035720 Y2024 (CFS).
INFO:AI_Trading_System_Free:Upserted 231 annual financial rows for 035720 Y2024 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'




{'status': '013', 'message': '조회된 데이타가 없습니다.'}

reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:26:19,645 - AI_Trading_System_Free - INFO - Annual data filtered for 051910 Y2018 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 051910 Y2018 (Processing as CFS)...
2025-04-29 07:26:19,693 - AI_Trading_System_Free - INFO - Upserted 205 annual financial rows for 051910 Y2018 (CFS).
INFO:AI_Trading_System_Free:Upserted 205 annual financial rows for 051910 Y2018 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:26:27,006 - AI_Trading_System_Free - INFO - Annual data filtered for 051910 Y2019 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 051910 Y2019 (Processing as CFS)...
2025-04-29 07:26:27,053 - AI_Trading_System_Free - INFO - Upserted 201 annual financial rows for 051910 Y2019 (CFS).
INFO:AI_Trading_System_Free:Upserted 201 annual financial rows for 051910 Y2019 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:26:34,134 - AI_Trading_System_Free - INFO - Annual data filtered for 051910 Y2020 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 051910 Y2020 (Processing as CFS)...
2025-04-29 07:26:34,179 - AI_Trading_System_Free - INFO - Upserted 193 annual financial rows for 051910 Y2020 (CFS).
INFO:AI_Trading_System_Free:Upserted 193 annual financial rows for 051910 Y2020 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:26:42,281 - AI_Trading_System_Free - INFO - Annual data filtered for 051910 Y2021 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 051910 Y2021 (Processing as CFS)...
2025-04-29 07:26:42,329 - AI_Trading_System_Free - INFO - Upserted 201 annual financial rows for 051910 Y2021 (CFS).
INFO:AI_Trading_System_Free:Upserted 201 annual financial rows for 051910 Y2021 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:26:50,122 - AI_Trading_System_Free - INFO - Annual data filtered for 051910 Y2022 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 051910 Y2022 (Processing as CFS)...
2025-04-29 07:26:50,192 - AI_Trading_System_Free - INFO - Upserted 202 annual financial rows for 051910 Y2022 (CFS).
INFO:AI_Trading_System_Free:Upserted 202 annual financial rows for 051910 Y2022 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:27:00,493 - AI_Trading_System_Free - INFO - Annual data filtered for 051910 Y2023 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 051910 Y2023 (Processing as CFS)...
2025-04-29 07:27:00,537 - AI_Trading_System_Free - INFO - Upserted 259 annual financial rows for 051910 Y2023 (CFS).
INFO:AI_Trading_System_Free:Upserted 259 annual financial rows for 051910 Y2023 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:27:09,274 - AI_Trading_System_Free - INFO - Annual data filtered for 051910 Y2024 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 051910 Y2024 (Processing as CFS)...
2025-04-29 07:27:09,338 - AI_Trading_System_Free - INFO - Upserted 213 annual financial rows for 051910 Y2024 (CFS).
INFO:AI_Trading_System_Free:Upserted 213 annual financial rows for 051910 Y2024 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'




{'status': '013', 'message': '조회된 데이타가 없습니다.'}

reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:27:19,037 - AI_Trading_System_Free - INFO - Annual data filtered for 005380 Y2018 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005380 Y2018 (Processing as CFS)...
2025-04-29 07:27:19,078 - AI_Trading_System_Free - INFO - Upserted 240 annual financial rows for 005380 Y2018 (CFS).
INFO:AI_Trading_System_Free:Upserted 240 annual financial rows for 005380 Y2018 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:27:28,409 - AI_Trading_System_Free - INFO - Annual data filtered for 005380 Y2019 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005380 Y2019 (Processing as CFS)...
2025-04-29 07:27:28,485 - AI_Trading_System_Free - INFO - Upserted 233 annual financial rows for 005380 Y2019 (CFS).
INFO:AI_Trading_System_Free:Upserted 233 annual financial rows for 005380 Y2019 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:27:36,930 - AI_Trading_System_Free - INFO - Annual data filtered for 005380 Y2020 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005380 Y2020 (Processing as CFS)...
2025-04-29 07:27:36,971 - AI_Trading_System_Free - INFO - Upserted 228 annual financial rows for 005380 Y2020 (CFS).
INFO:AI_Trading_System_Free:Upserted 228 annual financial rows for 005380 Y2020 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:27:45,941 - AI_Trading_System_Free - INFO - Annual data filtered for 005380 Y2021 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005380 Y2021 (Processing as CFS)...
2025-04-29 07:27:45,993 - AI_Trading_System_Free - INFO - Upserted 218 annual financial rows for 005380 Y2021 (CFS).
INFO:AI_Trading_System_Free:Upserted 218 annual financial rows for 005380 Y2021 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:27:54,778 - AI_Trading_System_Free - INFO - Annual data filtered for 005380 Y2022 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005380 Y2022 (Processing as CFS)...
2025-04-29 07:27:54,818 - AI_Trading_System_Free - INFO - Upserted 218 annual financial rows for 005380 Y2022 (CFS).
INFO:AI_Trading_System_Free:Upserted 218 annual financial rows for 005380 Y2022 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:28:03,684 - AI_Trading_System_Free - INFO - Annual data filtered for 005380 Y2023 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005380 Y2023 (Processing as CFS)...
2025-04-29 07:28:03,842 - AI_Trading_System_Free - INFO - Upserted 228 annual financial rows for 005380 Y2023 (CFS).
INFO:AI_Trading_System_Free:Upserted 228 annual financial rows for 005380 Y2023 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:28:16,035 - AI_Trading_System_Free - INFO - Annual data filtered for 005380 Y2024 (Processing as CFS)...
INFO:AI_Trading_System_Free:Annual data filtered for 005380 Y2024 (Processing as CFS)...
2025-04-29 07:28:16,112 - AI_Trading_System_Free - INFO - Upserted 320 annual financial rows for 005380 Y2024 (CFS).
INFO:AI_Trading_System_Free:Upserted 320 annual financial rows for 005380 Y2024 (CFS).


reprt_code='11011', fs_div='CFS' (사업보고서, 연결제무제표)'


2025-04-29 07:28:18,288 - AI_Trading_System_Free - INFO - **Annual** financial statement update completed. Total new/updated rows: 7429
INFO:AI_Trading_System_Free:**Annual** financial statement update completed. Total new/updated rows: 7429
2025-04-29 07:28:18,293 - AI_Trading_System_Free - INFO - --- DB에서 샘플 데이터 로드 ---
INFO:AI_Trading_System_Free:--- DB에서 샘플 데이터 로드 ---
2025-04-29 07:28:18,309 - AI_Trading_System_Free - INFO - Loaded 250 rows from table 'stock_prices' for tickers: ['AAPL']
INFO:AI_Trading_System_Free:Loaded 250 rows from table 'stock_prices' for tickers: ['AAPL']
2025-04-29 07:28:18,321 - AI_Trading_System_Free - INFO - Memory usage decreased from 0.02 MB to 0.01 MB (33.3% reduction)
INFO:AI_Trading_System_Free:Memory usage decreased from 0.02 MB to 0.01 MB (33.3% reduction)
2025-04-29 07:28:18,330 - AI_Trading_System_Free - INFO - --- 데이터 로딩/업데이트 프로세스 종료 ---
INFO:AI_Trading_System_Free:--- 데이터 로딩/업데이트 프로세스 종료 ---


{'status': '013', 'message': '조회된 데이타가 없습니다.'}


AAPL 주가 데이터 샘플 (최근 5일):
           ticker        open        high         low       close   adj_close  \
date                                                                            
2025-04-22   AAPL  196.119995  201.589996  195.970001  199.740005  199.740005   
2025-04-23   AAPL  206.000000  208.000000  202.800003  204.600006  204.600006   
2025-04-24   AAPL  204.889999  208.830002  202.940002  208.369995  208.369995   
2025-04-25   AAPL  206.369995  209.750000  206.199997  209.279999  209.279999   
2025-04-28   AAPL  210.059998  211.500000  207.460007  210.139999  210.139999   

              volume    source  
date                            
2025-04-22  52976400  yfinance  
2025-04-23  52929200  yfinance  
2025-04-24  47311000  yfinance  
2025-04-25  38198600  yfinance  
2025-04-28  37235797  yfinance  


In [19]:
# feature_engineer.py
import pandas as pd
import numpy as np


# TA-Lib 설치 확인 및 임포트 시도
try:
    import talib
    logger.info("TA-Lib library imported successfully.") #
except ImportError:
    logger.critical("TA-Lib not found. Please install it to use technical indicators. Feature engineering capabilities will be limited.") #
    logger.critical("You might need to install TA-Lib dependencies first. See TA-Lib documentation.") #
    # TA-Lib 없이 실행될 수 있도록 talib 변수를 None으로 설정 (선택적)
    talib = None #

class FeatureEngineer:
    """
    주가 데이터로부터 다양한 특징(Feature)을 추출하는 클래스.
    """
    def __init__(self,
                 use_technical_indicators=cfg.USE_TECHNICAL_INDICATORS, #
                 technical_indicator_list=cfg.TECHNICAL_INDICATORS, #
                 use_fundamental_data=cfg.USE_FUNDAMENTAL_DATA, #
                 use_sentiment_data=cfg.USE_SENTIMENT_DATA): #
        """
        FeatureEngineer 초기화

        Args:
            use_technical_indicators (bool): 기술적 지표 사용 여부.
            technical_indicator_list (list): 사용할 기술적 지표 이름 목록.
            use_fundamental_data (bool): 재무 데이터 사용 여부 (추후 구현).
            use_sentiment_data (bool): 감성 데이터 사용 여부 (추후 구현).
        """
        if use_technical_indicators and talib is None: #
            logger.warning("TA-Lib is not available, disabling technical indicators.") #
            self.use_technical_indicators = False #
        else:
            self.use_technical_indicators = use_technical_indicators #
        self.technical_indicator_list = technical_indicator_list #
        self.use_fundamental_data = use_fundamental_data #
        self.use_sentiment_data = use_sentiment_data #
        logger.info(f"FeatureEngineer initialized with settings: " #
                    f"Tech={self.use_technical_indicators}, Funda={self.use_fundamental_data}, " #
                    f"Sent={self.use_sentiment_data}") #
        if self.use_technical_indicators: #
             logger.info(f"Using technical indicators: {self.technical_indicator_list}") #

    @timeit #
    def add_technical_indicators(self, df): #
        """
        주어진 DataFrame에 기술적 지표를 계산하여 추가합니다.

        Args:
            df (pd.DataFrame): 'date'를 인덱스로 하고 'open', 'high', 'low', 'close', 'volume' 컬럼을 포함하는 DataFrame.

        Returns:
            pd.DataFrame: 기술적 지표가 추가된 DataFrame. 초기 NaN 값은 제거될 수 있습니다.
        """
        if not self.use_technical_indicators or talib is None: #
            logger.debug("Skipping technical indicator calculation.") #
            return df #

        logger.debug(f"Adding technical indicators to DataFrame with shape {df.shape}...") #
        df_out = df.copy() #

        # TA-Lib 입력 형식 확인 (필요시 타입 변환)
        required_cols = ['open', 'high', 'low', 'close', 'volume'] #
        for col in required_cols: #
            if col not in df_out.columns: #
                logger.error(f"Required column '{col}' not found in DataFrame. Cannot calculate indicators.") #
                return df # 원본 반환 #
            # 실수형(float)으로 변환 (talib는 float 타입 입력 선호)
            # Explicitly cast to float64
            df_out[col] = pd.to_numeric(df_out[col], errors='coerce').astype('float64') #

            # Impute missing values using the mean for numerical columns
            if df_out[col].isnull().any(): #
                df_out[col] = df_out[col].fillna(df_out[col].mean()) #


        # 필수 컬럼 NaNs 드롭 (talib 계산 불가 방지)
        # Dropping rows with NaNs AFTER type conversion and imputation
        df_out.dropna(subset=required_cols, inplace=True) #
        if df_out.empty: #
             logger.warning("DataFrame became empty after dropping NaNs in required columns.") #
             return df_out #

        # 지표 계산 (config에 정의된 리스트 기반)
        # --- 이동 평균 (MA) ---
        if 'MA5' in self.technical_indicator_list: #
            try: df_out['MA5'] = talib.MA(df_out['close'], timeperiod=5) #
            except Exception as e: logger.warning(f"Could not calculate MA5: {e}") #
        if 'MA10' in self.technical_indicator_list: # 예시 추가 #
            try: df_out['MA10'] = talib.MA(df_out['close'], timeperiod=10) #
            except Exception as e: logger.warning(f"Could not calculate MA10: {e}") #
        if 'MA20' in self.technical_indicator_list: #
            try: df_out['MA20'] = talib.MA(df_out['close'], timeperiod=20) #
            except Exception as e: logger.warning(f"Could not calculate MA20: {e}") #
        if 'MA60' in self.technical_indicator_list: #
            try: df_out['MA60'] = talib.MA(df_out['close'], timeperiod=60) #
            except Exception as e: logger.warning(f"Could not calculate MA60: {e}") #

        # --- RSI ---
        if 'RSI14' in self.technical_indicator_list: #
            try: df_out['RSI14'] = talib.RSI(df_out['close'], timeperiod=14) #
            except Exception as e: logger.warning(f"Could not calculate RSI14: {e}") #

        # --- MACD ---
        if 'MACD' in self.technical_indicator_list: #
            try:
                macd, macdsignal, macdhist = talib.MACD(df_out['close'], fastperiod=12, slowperiod=26, signalperiod=9) #
                df_out['MACD'] = macd #
                df_out['MACD_signal'] = macdsignal # 시그널 선 추가 #
                df_out['MACD_hist'] = macdhist     # 히스토그램 추가 #
            except Exception as e: logger.warning(f"Could not calculate MACD: {e}") #

        # --- 볼린저 밴드 (Bollinger Bands) ---
        if any(bb in self.technical_indicator_list for bb in ['BB_upper', 'BB_middle', 'BB_lower']): #
            try:
                upper, middle, lower = talib.BBANDS(df_out['close'], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0) #
                if 'BB_upper' in self.technical_indicator_list: df_out['BB_upper'] = upper #
                if 'BB_middle' in self.technical_indicator_list: df_out['BB_middle'] = middle # 중간선은 MA20과 동일 #
                if 'BB_lower' in self.technical_indicator_list: df_out['BB_lower'] = lower #
            except Exception as e: logger.warning(f"Could not calculate Bollinger Bands: {e}") #

        # --- ATR (Average True Range) - 변동성 지표 ---
        if 'ATR' in self.technical_indicator_list: #
            try: df_out['ATR'] = talib.ATR(df_out['high'], df_out['low'], df_out['close'], timeperiod=14) #
            except Exception as e: logger.warning(f"Could not calculate ATR: {e}") #

        # --- Stochastic Oscillator (%K, %D) ---
        if any(stoch in self.technical_indicator_list for stoch in ['STOCH_K', 'STOCH_D']): #
             try:
                 slowk, slowd = talib.STOCH(df_out['high'], df_out['low'], df_out['close'], #
                                           fastk_period=14, slowk_period=3, slowk_matype=0, #
                                           slowd_period=3, slowd_matype=0) #
                 if 'STOCH_K' in self.technical_indicator_list: df_out['STOCH_K'] = slowk #
                 if 'STOCH_D' in self.technical_indicator_list: df_out['STOCH_D'] = slowd #
             except Exception as e: logger.warning(f"Could not calculate Stochastic Oscillator: {e}") #

        # --- CCI (Commodity Channel Index) ---
        if 'CCI' in self.technical_indicator_list: #
            try: df_out['CCI'] = talib.CCI(df_out['high'], df_out['low'], df_out['close'], timeperiod=14) #
            except Exception as e: logger.warning(f"Could not calculate CCI: {e}") #

        # --- ADX (Average Directional Movement Index) ---
        if 'ADX' in self.technical_indicator_list: #
             try: df_out['ADX'] = talib.ADX(df_out['high'], df_out['low'], df_out['close'], timeperiod=14) #
             except Exception as e: logger.warning(f"Could not calculate ADX: {e}") #

        # --- OBV (On Balance Volume) ---
        if 'OBV' in self.technical_indicator_list: #
             try: df_out['OBV'] = talib.OBV(df_out['close'], df_out['volume']) #
             except Exception as e: logger.warning(f"Could not calculate OBV: {e}") #


        # 기술적 지표 계산 후 생성된 NaN 값 처리
        # 처음 N개 행은 이동평균 등 계산 불가로 NaN 발생
        # dropna()는 데이터를 너무 많이 제거할 수 있으므로, ffill/bfill 권장
        initial_rows = len(df_out) #
        logger.debug(f"Shape before NaN drop: {df_out.shape}. Dropping rows with NaNs generated by indicators...") #

        # 기술적 지표 계산으로 인해 NaN이 포함된 모든 행 제거
        df_out.dropna(inplace=True) #

        rows_after_drop = len(df_out) #

        if initial_rows > rows_after_drop: #
             logger.info(f"Dropped {initial_rows - rows_after_drop} rows containing NaNs after indicator calculation.") #
        else:
             logger.debug("No rows dropped after indicator calculation (or DataFrame was already empty).") #

        if df_out.empty: #
             logger.warning(f"DataFrame became empty after dropping indicator NaNs for ticker.") #
             # 비어 있으면 더 이상 처리 의미 없음
             return df_out #

        # 남은 값들에 대해 최종 타입 확인 및 0 채우기 (필요시)
        if df_out.isnull().values.any(): #
            logger.warning("Unexpected NaN values remain even after dropna. Filling with 0.") #
            numeric_cols_final = df_out.select_dtypes(include=np.number).columns #
            df_out[numeric_cols_final] = df_out[numeric_cols_final].fillna(0) #
            if df_out.isnull().values.any(): #
                 df_out.fillna(0, inplace=True) #

        logger.info(f"Technical indicators added and NaNs dropped. Resulting shape: {df_out.shape}") #
        return df_out #

    @timeit #
    def add_fundamental_indicators(self, price_df, fin_df): #
        """
        재무 데이터를 주가 데이터에 병합하고 기본 재무 지표를 계산합니다.
        (주의: 연간/분기 재무 데이터를 일별 주가와 병합 시 시점 불일치 고려 필요)

        Args:
            price_df (pd.DataFrame): 주가 데이터 (날짜 인덱스).
            fin_df (pd.DataFrame): 재무 데이터 (DataManager에서 로드).

        Returns:
            pd.DataFrame: 재무 지표가 병합/계산된 DataFrame.
        """
        if not self.use_fundamental_data or fin_df is None or fin_df.empty: #
            logger.debug("Skipping fundamental indicator calculation.") #
            return price_df #

        logger.debug(f"Adding fundamental indicators to DataFrame with shape {price_df.shape}...") #
        df_out = price_df.copy() #
        ticker = df_out['ticker'].iloc[0] if 'ticker' in df_out.columns else None # 티커 확인 (로깅용) #

        try:
            # 필요한 재무 데이터만 필터링 (예: 특정 계정)
            # DART 계정명은 보고서마다 조금씩 다를 수 있어 유연한 매칭 필요
            accounts_needed = ['유동자산', '부채총계', '자본총계', '매출액', '영업이익', '당기순이익'] # 예시 #
            fin_filtered = fin_df[fin_df['account_name'].isin(accounts_needed)].copy() #

            if fin_filtered.empty: #
                logger.warning(f"No required financial accounts found for {ticker} in provided data.") #
                return df_out #

            # Pivot 테이블 생성: 날짜(연도/분기)별 계정 값
            fin_pivot = fin_filtered.pivot_table(index=['year', 'quarter'], columns='account_name', values='account_value') #

            # 연도, 분기 정보 생성 (날짜 인덱스 활용)
            df_out['year'] = df_out.index.year #
            df_out['quarter'] = ((df_out.index.month - 1) // 3) + 1 # 월 -> 분기 변환 #

            # 주가 데이터와 재무 데이터 병합
            # 다음 분기 재무 데이터가 발표되기 전까지 이전 분기 데이터 사용 (ffill)
            df_out = pd.merge(df_out.reset_index(), fin_pivot.reset_index(), on=['year', 'quarter'], how='left').set_index('date') #
            # 재무 데이터 컬럼 Forward Fill
            fin_cols = fin_pivot.columns #
            df_out[fin_cols] = df_out[fin_cols].ffill() #
            # 처음에 NaN이 있을 수 있으므로 bfill도 적용
            df_out[fin_cols] = df_out[fin_cols].bfill() #

            # --- 재무 지표 계산 (예시) ---
            # 발행 주식 수 정보 필요 (OpenDART 'stock_totqy' 등 별도 조회 필요)
            # number_of_shares = get_number_of_shares(ticker, df_out.index.max()) # 가정
            number_of_shares = 1e9 # 임시 값 (매우 부정확!) #

            if '당기순이익' in df_out.columns and number_of_shares > 0: #
                 # 분기 실적을 연환산 (Trailing Twelve Months, TTM) 하는 것이 더 정확할 수 있음
                 # 여기서는 가장 최근 분기 * 4 로 단순화 (매우 부정확한 가정!)
                 df_out['EPS_calculated'] = (df_out['당기순이익'] * 4) / number_of_shares #
                 df_out['PER_calculated'] = df_out['close'] / df_out['EPS_calculated'] #

            if '자본총계' in df_out.columns and number_of_shares > 0: #
                 df_out['BPS_calculated'] = df_out['자본총계'] / number_of_shares #
                 df_out['PBR_calculated'] = df_out['close'] / df_out['BPS_calculated'] #

            if '매출액' in df_out.columns: #
                 df_out['PSR_calculated'] = (df_out['close'] * number_of_shares) / (df_out['매출액'] * 4) # 시가총액 / 연환산 매출액 #

            # 계산된 지표의 무한대 값 처리
            df_out.replace([np.inf, -np.inf], np.nan, inplace=True) #
            # 계산된 지표도 ffill/bfill
            calc_fin_cols = [col for col in df_out.columns if '_calculated' in col] #
            df_out[calc_fin_cols] = df_out[calc_fin_cols].ffill().bfill() #

            logger.info(f"Fundamental indicators added/merged for {ticker}. Resulting shape: {df_out.shape}") #
            # 병합에 사용된 'year', 'quarter' 및 원본 재무 컬럼은 제거 가능
            df_out.drop(columns=['year', 'quarter'] + list(fin_cols), errors='ignore', inplace=True) #

        except Exception as e:
            logger.error(f"Error adding fundamental indicators for {ticker}: {e}", exc_info=True) #
            # 오류 발생 시에도 원본 데이터프레임 구조는 유지하려고 시도
            # 원래 컬럼 복구 (필요 시)
            df_out = price_df.copy() #

        return df_out #


    @timeit #
    def add_sentiment_indicators(self, price_df, sentiment_df): #
        """
        일별 집계된 감성 점수를 주가 데이터에 병합합니다.

        Args:
            price_df (pd.DataFrame): 주가 데이터 (날짜 인덱스).
            sentiment_df (pd.DataFrame): 날짜를 인덱스로 하고 'sentiment_score' 컬럼을 가진 DataFrame.

        Returns:
            pd.DataFrame: 감성 점수가 병합된 DataFrame.
        """
        if not self.use_sentiment_data or sentiment_df is None or sentiment_df.empty: #
            logger.debug("Skipping sentiment indicator calculation.") #
            return price_df #

        logger.debug(f"Adding sentiment indicators to DataFrame with shape {price_df.shape}...") #
        df_out = price_df.copy() #
        ticker = df_out['ticker'].iloc[0] if 'ticker' in df_out.columns else None #

        try:
            # 날짜 인덱스 기준으로 병합
            df_out = pd.merge(df_out, sentiment_df[['sentiment_score']], left_index=True, right_index=True, how='left') #

            # 감성 점수 결측치 처리 (주말 등 뉴스가 없는 날)
            # 1. 0으로 채우기 (중립으로 간주)
            # df_out['sentiment_score'].fillna(0, inplace=True)
            # 2. Forward fill (이전 감성 유지) - 더 일반적일 수 있음
            df_out['sentiment_score'].ffill(inplace=True) #
            df_out['sentiment_score'].bfill(inplace=True) # 시작 부분 NaN 채우기 #

            logger.info(f"Sentiment indicators merged for {ticker}. Resulting shape: {df_out.shape}") #
        except Exception as e:
            logger.error(f"Error adding sentiment indicators for {ticker}: {e}", exc_info=True) #
            if 'sentiment_score' in df_out.columns: # 오류 시 추가된 컬럼 제거 #
                 df_out.drop(columns=['sentiment_score'], inplace=True, errors='ignore') #


        return df_out #


    @timeit #
    def process_features(self, price_df, fin_df=None, sentiment_df=None): #
        """
        주어진 데이터에 대해 모든 특징 공학 단계를 순차적으로 실행합니다.

        Args:
            price_df (pd.DataFrame): 원본 주가 데이터.
            fin_df (pd.DataFrame, optional): 재무 데이터. Defaults to None.
            sentiment_df (pd.DataFrame, optional): 감성 데이터. Defaults to None.

        Returns:
            pd.DataFrame: 모든 특징이 추가되고 처리된 DataFrame.
                         RL 환경 등에 입력으로 사용할 수 있는 형태.
        """
        if price_df is None or price_df.empty: #
            logger.warning("Input price_df is empty. Cannot process features.") #
            return pd.DataFrame() #

        ticker = price_df['ticker'].iloc[0] if 'ticker' in price_df.columns else 'Unknown Ticker' #
        logger.info(f"Starting feature engineering process for ticker: {ticker}...") #

        # 0. 원본 데이터 복사 및 기본 컬럼 유지 확인
        df_processed = price_df.copy() #
        required_cols = ['open', 'high', 'low', 'close', 'volume'] #
        if not all(col in df_processed.columns for col in required_cols): #
             logger.error(f"Input DataFrame for {ticker} must contain {required_cols}.") #
             return pd.DataFrame() #

        # 1. 기술적 지표 추가
        if self.use_technical_indicators: #
            df_processed = self.add_technical_indicators(df_processed) #
            if df_processed.empty: #
                logger.warning(f"DataFrame for {ticker} became empty after technical indicators. Aborting further processing.") #
                return df_processed #

        # 2. 재무 지표 추가 (구현 시)
        if self.use_fundamental_data: #
            df_processed = self.add_fundamental_indicators(df_processed, fin_df) #
            if df_processed.empty: #
                 logger.warning(f"DataFrame for {ticker} became empty after fundamental indicators. Aborting further processing.") #
                 return df_processed #

        # 3. 감성 지표 추가 (구현 시)
        if self.use_sentiment_data: #
            df_processed = self.add_sentiment_indicators(df_processed, sentiment_df) #
            if df_processed.empty: #
                 logger.warning(f"DataFrame for {ticker} became empty after sentiment indicators. Aborting further processing.") #
                 return df_processed #

        # 4. 최종 정리
        #    - 무한대 값 처리 (지표 계산 중 발생 가능성)
        df_processed.replace([np.inf, -np.inf], np.nan, inplace=True) #
        #    - 남은 NaN 처리 (모든 단계 후 최종 확인)
        initial_rows = len(df_processed) #

        # Explicitly select numeric columns for fillna
        numeric_cols = df_processed.select_dtypes(include=np.number).columns #
        # Convert all numeric columns to float64
        df_processed[numeric_cols] = df_processed[numeric_cols].astype('float64') #
        df_processed[numeric_cols] = df_processed[numeric_cols].fillna(method='ffill') #
        df_processed[numeric_cols] = df_processed[numeric_cols].fillna(method='bfill')  # 시작점 NaN 처리 #

        # If NaNs persist in other columns, consider filling with 0 or other appropriate value
        if df_processed.isnull().values.any(): #
            logger.warning(f"NaN values still exist after ffill/bfill for {ticker}. Filling with 0.") #
            df_processed.fillna(0, inplace=True) #

        #    - RL 환경 등에 필요한 컬럼만 선택 (선택적)
        #      예: ['close', 'MA5', 'RSI14', 'MACD', 'ATR', 'volume', 'sentiment_score', ...]
        #      이 단계에서 선택하거나, 환경(Environment)에서 입력 받을 때 선택
        # final_features = cfg.FINAL_FEATURE_COLUMNS # config에 정의된 최종 컬럼 리스트
        # df_processed = df_processed[final_features]

        #    - 메모리 사용량 최적화
        df_processed = reduce_mem_usage(df_processed) #

        logger.info(f"Feature engineering completed for {ticker}. Final shape: {df_processed.shape}") #
        return df_processed #

2025-04-29 07:28:18,389 - AI_Trading_System_Free - INFO - TA-Lib library imported successfully.
INFO:AI_Trading_System_Free:TA-Lib library imported successfully.


In [20]:
# environment.py (셀 8 수정 완료 - 생략 없음)
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd
from collections import deque
import logging

# 이 셀에서는 이전 셀들에서 정의된 cfg, logger, DataManager, FeatureEngineer, RiskManager 등을 사용합니다.

# --- 실제 환경 클래스 ---
class StockTradingEnv(gym.Env):
    """
    단일 주식을 거래하는 강화학습 환경 (Gymnasium 인터페이스 따름)
    (Stable Baselines3 호환성을 위해 평탄화된 Observation Space 사용)
    """
    metadata = {'render_modes': ['human', 'ansi', 'rgb_array'], 'render_fps': 30}

    def __init__(self, data_manager, feature_engineer, risk_manager, # risk_manager 인자 추가
                 tickers, # 반드시 리스트 형태로 받지만, 내부적으로는 첫 번째 요소만 주로 사용
                 start_date=cfg.TRAIN_START_DATE, end_date=cfg.TRAIN_END_DATE,
                 is_training=True):
        """
        환경 초기화

        Args:
            data_manager (DataManager): 데이터 관리자 인스턴스.
            feature_engineer (FeatureEngineer): 특징 공학자 인스턴스.
            risk_manager (RiskManager): 위험 관리자 인스턴스.
            tickers (list): 거래할 종목 티커 리스트 (반드시 1개 요소만 포함해야 함).
            start_date (str): 환경에서 사용할 데이터 시작 날짜 (YYYY-MM-DD).
            end_date (str): 환경에서 사용할 데이터 종료 날짜 (YYYY-MM-DD).
            is_training (bool): 학습 모드 여부.
        """
        super(StockTradingEnv, self).__init__()
        self.logger = logging.getLogger(self.__class__.__name__) # 클래스 이름으로 로거 가져오기

        self.data_manager = data_manager
        self.feature_engineer = feature_engineer
        self.rm = risk_manager # RiskManager 인스턴스 저장

        # --- 단일 티커 처리 명확화 ---
        if not isinstance(tickers, list) or len(tickers) != 1:
            # Stable Baselines3 와 호환되는 간단한 구조를 위해 단일 티커 환경으로 제한
            raise ValueError("StockTradingEnv (modified version) currently requires exactly one ticker in the 'tickers' list.")
        self.ticker = tickers[0] # 단일 티커 저장
        # self.tickers = tickers # 필요 시 리스트 유지 가능하나, 로직 대부분은 self.ticker 사용
        # --- 단일 티커 처리 명확화 끝 ---

        self.start_date = start_date
        self.end_date = end_date
        self.is_training = is_training

        self.window_size = cfg.ENV_WINDOW_SIZE      # 에이전트가 볼 과거 데이터 기간
        self.initial_balance = cfg.ENV_INITIAL_BALANCE
        self.transaction_cost_pct = cfg.ENV_TRANSACTION_COST_PCT
        self.slippage_pct = cfg.ENV_SLIPPAGE_PCT
        # self.position_max_ratio = cfg.ENV_POSITION_MAX_RATIO # 환경 변수보다는 RiskManager가 관리

        # 데이터 로드 및 특징 추출 (초기화 시)
        self._load_and_process_data()

        # processed_data 유효성 검사
        if not hasattr(self, 'processed_data') or self.ticker not in self.processed_data or self.processed_data[self.ticker].empty:
             raise ValueError(f"No processed data available for the specified ticker: {self.ticker}")

        self.num_features = self.processed_data[self.ticker].shape[1]
        feature_shape = (self.window_size, self.num_features)

        # --- 평탄화된 관찰 공간 정의 ---
        self.observation_space = spaces.Dict({
            # 'features': 특징 데이터 (Window x Features) - 단일 티커용 Box
            'features': spaces.Box(low=-np.inf, high=np.inf, shape=feature_shape, dtype=np.float32),
            # 'portfolio': 포트폴리오 상태 (단일 티커 보유 비율 + 현금 비율) - Box(2,)
            'portfolio': spaces.Box(low=0.0, high=1.0, shape=(1 + 1,), dtype=np.float32)
        })
        # --- 평탄화된 관찰 공간 정의 끝 ---

        # --- 단일 티커 기준 행동 공간 정의 ---
        self.action_space = spaces.Discrete(3) # 0: 매수, 1: 매도, 2: 유지
        # --- 단일 티커 기준 행동 공간 정의 끝 ---

        # 에피소드 상태 변수
        self.balance = 0.0
        self.portfolio_value = 0.0
        self.portfolio_holdings = {self.ticker: 0} # 단일 티커 기준 초기화
        self.current_step = 0       # 전체 데이터에서의 현재 위치 (인덱스)
        self.start_step = self.window_size # 학습 시작 위치
        self.total_steps = len(self.global_dates) # 전체 타임스텝 길이
        self.done = False
        self.info = {} # 추가 정보 저장

        self.logger.info(f"StockTradingEnv initialized for ticker: {self.ticker}.")
        self.logger.info(f"Observation space: {self.observation_space}")
        self.logger.info(f"Action space: {self.action_space}")
        self.logger.info(f"Total steps in data: {self.total_steps}, Start step: {self.start_step}")


    def _load_and_process_data(self):
        """
        DataManager를 통해 데이터를 로드하고 FeatureEngineer로 처리하여 저장합니다.
        (단일 티커 처리 방식으로 수정)
        """
        self.logger.info(f"Loading and processing data for ticker {self.ticker} from {self.start_date} to {self.end_date}...")
        self.raw_data = {}
        self.processed_data = {}
        # common_dates 는 단일 티커이므로 해당 티커의 인덱스가 됨

        # --- 단일 티커 데이터 로드 ---
        df_raw = self.data_manager.get_data('stock_prices', tickers=[self.ticker], # 리스트로 전달
                                             start_date=self.start_date, end_date=self.end_date)
        if df_raw is None or df_raw.empty:
            self.logger.error(f"Failed to load raw data for ticker: {self.ticker}.")
            # 이 경우 환경 초기화 실패 처리 필요 -> __init__ 에서 처리
            return

        if not isinstance(df_raw.index, pd.DatetimeIndex):
             if 'date' in df_raw.columns:
                  df_raw['date'] = pd.to_datetime(df_raw['date'])
                  df_raw = df_raw.set_index('date')
             else:
                  self.logger.error(f"Date index issue for {self.ticker}.")
                  return
        self.raw_data[self.ticker] = df_raw

        # --- 특징 추출 ---
        # 재무/감성 데이터 로딩 로직 추가 필요 시 여기에 구현
        fin_df = None # 예시: 재무 데이터 로딩
        if self.feature_engineer.use_fundamental_data:
             try:
                f_start_y = int(self.start_date[:4])
                f_end_y = int(self.end_date[:4])
                # 연간 데이터 조회 (quarter=4 가정)
                fin_df = self.data_manager.get_data('financial_statements', tickers=[self.ticker],
                                                    start_year=f_start_y, end_year=f_end_y,
                                                    other_conditions="quarter = 4 AND fs_div = 'CFS'") # CFS 우선 조회
                if fin_df is None or fin_df.empty:
                     fin_df = self.data_manager.get_data('financial_statements', tickers=[self.ticker],
                                                        start_year=f_start_y, end_year=f_end_y,
                                                        other_conditions="quarter = 4 AND fs_div = 'OFS'") # 없으면 OFS 조회
             except Exception as e:
                 self.logger.error(f"Error loading fundamental data for {self.ticker}: {e}")
                 fin_df = None

        df_processed = self.feature_engineer.process_features(df_raw.copy(), fin_df=fin_df) # fin_df 전달

        if df_processed is None or df_processed.empty:
            self.logger.error(f"Failed to process features for ticker: {self.ticker}.")
            # __init__ 에서 처리
            return

        if not isinstance(df_processed.index, pd.DatetimeIndex):
             self.logger.error(f"Processed data for {self.ticker} does not have DatetimeIndex.")
             return

        self.processed_data[self.ticker] = df_processed

        # --- 데이터 정렬 및 NaN 처리 ---
        self.global_dates = df_processed.index.sort_values() # 공통 날짜 = 해당 티커의 유효 날짜

        # reindex는 필요 없음 (단일 티커이므로)
        # NaN 처리 (Feature Engineer에서 처리되었어야 하지만 재확인)
        if self.processed_data[self.ticker].isnull().values.any():
             self.logger.warning(f"NaN values found in processed data for {self.ticker} before env use. Filling.")
             # 주의: ffill/bfill 대신 평균값 또는 0으로 채우는 것이 안전할 수 있음
             # self.processed_data[self.ticker].fillna(method='ffill', inplace=True)
             # self.processed_data[self.ticker].fillna(method='bfill', inplace=True)
             numeric_cols = self.processed_data[self.ticker].select_dtypes(include=np.number).columns
             self.processed_data[self.ticker][numeric_cols] = self.processed_data[self.ticker][numeric_cols].fillna(0) # 0으로 채우기

        if self.raw_data[self.ticker].isnull().values.any():
             self.logger.warning(f"NaN values found in raw data for {self.ticker}. Filling.")
             numeric_cols_raw = self.raw_data[self.ticker].select_dtypes(include=np.number).columns
             self.raw_data[self.ticker][numeric_cols_raw] = self.raw_data[self.ticker][numeric_cols_raw].fillna(0) # 0으로 채우기

        # 최종 데이터 길이 확인
        self.total_steps = len(self.global_dates)
        if self.total_steps <= self.window_size:
            self.logger.error(f"Insufficient data length ({self.total_steps}) for window size ({self.window_size}) for ticker {self.ticker}.")
            # __init__ 에서 오류 처리
            return

        self.logger.info(f"Data loaded and processed for {self.ticker}. Total steps: {self.total_steps}")


    def _get_observation(self):
        """현재 스텝 기준 관찰(state) 구성 (수정된 구조)"""
        obs = {'features': None, 'portfolio': np.zeros(2, dtype=np.float32)} # Shape: (2,) = 티커 1개 + 현금 1개

        # 1. 특징 데이터 (단일 티커)
        start_idx = max(0, self.current_step - self.window_size + 1)
        end_idx = self.current_step + 1 # 현재 스텝 포함

        try:
            if end_idx > len(self.processed_data[self.ticker]): # 인덱스 범위 초과 방지
                self.logger.warning(f"Observation index {end_idx} out of bounds ({len(self.processed_data[self.ticker])}) for {self.ticker}. Using last available data.")
                end_idx = len(self.processed_data[self.ticker])
                start_idx = max(0, end_idx - self.window_size) # 시작 인덱스 재조정

            ticker_features = self.processed_data[self.ticker].iloc[start_idx:end_idx].values

            # Padding (데이터 시작 부분 처리)
            if ticker_features.shape[0] < self.window_size:
                padding_size = self.window_size - ticker_features.shape[0]
                padding = np.zeros((padding_size, self.num_features), dtype=np.float32)
                ticker_features = np.vstack((padding, ticker_features))

        except Exception as e:
             self.logger.error(f"Error retrieving feature data for {self.ticker} at step {self.current_step}: {e}", exc_info=True)
             ticker_features = np.zeros((self.window_size, self.num_features), dtype=np.float32) # 오류 시 0으로 채움

        obs['features'] = ticker_features.astype(np.float32)

        # 2. 포트폴리오 상태 (단일 티커 보유 비율 + 현금 비율)
        portfolio_state = np.zeros(2, dtype=np.float32) # [티커비율, 현금비율]
        current_total_value = self._calculate_portfolio_value()
        if current_total_value > 1e-9: # 0으로 나누기 방지 (매우 작은 값 포함)
            # 현금 비율 (Index 1)
            cash_ratio = self.balance / current_total_value
            portfolio_state[1] = cash_ratio

            # 티커 보유 비율 (Index 0)
            current_price = self._get_current_price(self.ticker) # 현재 스텝 가격
            holding_quantity = self.portfolio_holdings.get(self.ticker, 0)
            holding_value = holding_quantity * current_price
            portfolio_state[0] = holding_value / current_total_value

            # 비율 합계가 1에 가깝도록 조정 및 범위 제한
            portfolio_state = np.clip(portfolio_state, 0.0, 1.0)
            state_sum = portfolio_state.sum()
            if state_sum > 1e-9: # 0으로 나누기 방지
                 portfolio_state /= state_sum # 합계 1로 정규화
            else:
                 # 합계가 0이면 현금만 100% 로 가정
                 portfolio_state[1] = 1.0
                 portfolio_state[0] = 0.0

        else:
            # 포트폴리오 가치가 0이면 현금 비율 100% (초기 상태 등)
             portfolio_state[1] = 1.0
             portfolio_state[0] = 0.0


        obs['portfolio'] = portfolio_state.astype(np.float32)

        return obs


    def _get_current_price(self, ticker):
        """현재 스텝의 종가 반환"""
        try:
            # raw_data 사용 보장 (미래 정보 누수 방지)
            if self.current_step < len(self.global_dates):
                return self.raw_data[ticker]['close'].iloc[self.current_step]
            else: # 에피소드 종료 후 등 예외 상황
                self.logger.warning(f"Attempting get price after end for {ticker}. Return last.")
                return self.raw_data[ticker]['close'].iloc[-1]
        except (KeyError, IndexError, AttributeError) as e:
            self.logger.error(f"Get current price failed for {ticker} at step {self.current_step}: {e}. Return 0.")
            return 0.0


    def _calculate_portfolio_value(self):
        """현재 시점의 포트폴리오 총 가치 계산"""
        value = self.balance
        # 단일 티커에 대해서만 계산
        quantity = self.portfolio_holdings.get(self.ticker, 0)
        if quantity > 0:
            current_price = self._get_current_price(self.ticker)
            value += quantity * current_price
        return max(value, 0.0) # 0 이하 방지


    def reset(self, seed=None, options=None):
        """환경 초기화"""
        super().reset(seed=seed)
        self.balance = self.initial_balance
        self.portfolio_holdings = {self.ticker: 0} # 단일 티커 초기화
        self.portfolio_value = self.initial_balance
        self.current_step = self.start_step # window 채우기 위한 시작 스텝
        self.done = False
        self.info = {'step_rewards': []} # 정보 초기화
        self.logger.debug(f"Environment reset for {self.ticker}. Start step: {self.current_step}")
        self.portfolio_value = self._calculate_portfolio_value() # 리셋 시점 가치 계산
        observation = self._get_observation()
        info = self._get_info() # 초기 정보 반환
        return observation, info


    def step(self, action): # action은 단일 값 (0, 1, 2)
        """환경 스텝 진행 (단일 action 처리)"""
        if self.done:
             self.logger.warning("Env done. Call reset().")
             obs = self._get_observation(); info = self._get_info(); return obs, 0.0, False, True, info # truncated=True

        prev_portfolio_value = self._calculate_portfolio_value()
        trades_executed = {'buy': [], 'sell': [], 'hold': []}
        ticker = self.ticker # 명시적으로 단일 티커 사용

        current_price = self._get_current_price(ticker)
        if current_price <= 0:
            self.logger.warning(f"Invalid price ({current_price}) for {ticker} at step {self.current_step}. Holding.")
            trades_executed['hold'].append(ticker)
        else:
            # --- 단일 action 처리 ---
            current_quantity = self.portfolio_holdings.get(ticker, 0)

            # 1. 매수 (Action == 0)
            if action == 0:
                 # RiskManager 활용하여 구매 수량 결정
                 size_to_buy = self.rm.calculate_position_size(
                     ticker=ticker,
                     portfolio_value=self.portfolio_value,
                     current_price=current_price,
                     available_cash=self.balance,
                     current_holdings=current_quantity
                 )
                 if size_to_buy > 0:
                    effective_buy_price = current_price * (1 + self.transaction_cost_pct + self.slippage_pct)
                    cost = size_to_buy * effective_buy_price
                    if self.balance >= cost: # 구매 가능 확인
                       self.balance -= cost
                       self.portfolio_holdings[ticker] = current_quantity + size_to_buy
                       trades_executed['buy'].append(f"{ticker}({size_to_buy} @ {current_price:.2f})")
                    else: # 잔고 부족
                         logger.debug(f"Insufficient balance for buy order {ticker}: Have {self.balance:.2f}, Need {cost:.2f}")
                         trades_executed['hold'].append(ticker)
                 else: # RiskManager가 0 반환 (예: 최대 비율 도달)
                     trades_executed['hold'].append(ticker)

            # 2. 매도 (Action == 1)
            elif action == 1:
                 if current_quantity > 0: # 보유하고 있을 때만 매도
                      quantity_to_sell = current_quantity # 현재는 전체 매도 가정
                      effective_sell_price = max(0, current_price * (1 - self.transaction_cost_pct - self.slippage_pct))
                      proceeds = quantity_to_sell * effective_sell_price
                      self.balance += proceeds
                      self.portfolio_holdings[ticker] = 0 # 보유량 0으로
                      trades_executed['sell'].append(f"{ticker}({quantity_to_sell} @ {current_price:.2f})")
                 else: # 보유량 없으면 홀드
                      trades_executed['hold'].append(ticker)

            # 3. 유지 (Action == 2)
            else:
                 trades_executed['hold'].append(ticker)

        # 다음 스텝 이동 및 상태 업데이트
        self.current_step += 1
        self.portfolio_value = self._calculate_portfolio_value() # 거래 후 포트폴리오 가치 재계산

        # 보상 계산 (포트폴리오 가치 변화율)
        if prev_portfolio_value > 1e-9: # 0으로 나누기 방지
             reward = (self.portfolio_value - prev_portfolio_value) / prev_portfolio_value
        else: reward = 0.0
        self.info.setdefault('step_rewards', []).append(reward)

        # 종료 조건 확인
        terminated = False # 조기 종료 조건 (예: 자산 고갈)
        # if self.portfolio_value <= self.initial_balance * 0.5: terminated = True; self.done = True

        truncated = False # 시간 제한 도달
        if self.current_step >= self.total_steps - 1:
            self.done = True; truncated = True
            self.logger.info(f"Episode finished for {self.ticker} at step {self.current_step}. Final value: {self.portfolio_value:.2f}")

        # 반환값 준비
        observation = self._get_observation()
        self.info['trades_executed'] = trades_executed # 현재 스텝 거래 정보 저장
        info = self._get_info()

        return observation, reward, terminated, truncated, info


    def _get_info(self):
       """보조 정보 반환 (단일 티커 기준)"""
       safe_step = min(self.current_step, len(self.global_dates) - 1) if hasattr(self, 'global_dates') and len(self.global_dates) > 0 else -1
       info = {
           "current_step": self.current_step,
           "global_date": self.global_dates[safe_step].strftime('%Y-%m-%d') if safe_step >= 0 else 'N/A',
           "balance": self.balance,
           "portfolio_value": self.portfolio_value,
           "portfolio_holdings": self.portfolio_holdings.copy(), # {ticker: qty} 형태
           "step_reward": self.info.get('step_rewards', [])[-1] if self.info.get('step_rewards') else 0.0,
           "trades_executed": self.info.get('trades_executed', {'buy':[],'sell':[],'hold':[]}) # step에서 저장된 정보
       }
       return info


    def render(self, mode='human'):
        """환경 상태 시각화 (단일 티커용)"""
        if mode == 'human' or mode == 'ansi':
            info = self._get_info()
            output = f"--- Step: {info['current_step']} | Date: {info['global_date']} ({self.ticker}) ---\n"
            output += f"Balance: {info['balance']:,.2f}\n"
            output += f"Portfolio Value: {info['portfolio_value']:,.2f}\n"
            output += "Holdings:\n"
            holdings_str = []
            quantity = info['portfolio_holdings'].get(self.ticker, 0)
            if quantity > 0:
                 current_price = self._get_current_price(self.ticker)
                 holdings_str.append(f"  - {self.ticker}: {quantity} shares @ {current_price:.2f} (Value: {quantity * current_price:,.2f})")
            if not holdings_str: output += "  (None)\n"
            else: output += "\n".join(holdings_str) + "\n"

            output += f"Last Step Reward: {info.get('step_reward', 0.0):.4f}\n"
            trades = info.get('trades_executed', {'buy':[],'sell':[],'hold':[]})
            output += f"Trades: Buy: {trades['buy']}, Sell: {trades['sell']}, Hold: {len(trades['hold'])}\n"
            print(output)
            return output if mode == 'ansi' else None
        elif mode == 'rgb_array':
            self.logger.warning("render(mode='rgb_array') not implemented.")
            return None
        else:
            # super() 호출 시 mode 전달 필요 가능성
            # super(StockTradingEnv, self).render(mode=mode)
            pass # Gymnasium 기본 render는 보통 없음


    def close(self):
        """환경 자원 해제"""
        self.logger.info(f"Closing StockTradingEnv for {self.ticker}.")
        # 특별히 해제할 자원 없음

print("StockTradingEnv class defined.")

StockTradingEnv class defined.


In [21]:
# risk_manager.py
import numpy as np #


class RiskManager: #
    """
    트레이딩 위험 관리 로직을 담당하는 클래스.
    포지션 크기 결정, 손절매 조건 확인 등의 기능을 제공합니다.
    """
    def __init__(self): #
        """RiskManager 초기화. 설정 파일(cfg)에서 파라미터를 로드합니다."""
        self.max_pos_ratio = cfg.RISK_MAX_POSITION_RATIO #
        self.stop_loss_pct = cfg.RISK_STOP_LOSS_PCT #
        self.use_trailing_stop = cfg.RISK_USE_TRAILING_STOP #
        self.trailing_stop_pct = cfg.RISK_TRAILING_STOP_PCT #
        self.max_system_drawdown = cfg.RISK_MAX_SYSTEM_DRAWDOWN # 시스템 최대 손실률 (백테스터에서 활용) #

        # 켈리 기준 관련 설정
        self.use_kelly_criterion = cfg.RISK_USE_KELLY_CRITERION #
        self.kelly_fraction = cfg.RISK_KELLY_FRACTION # 사용할 켈리 비율 #
        self.kelly_win_rate = cfg.RISK_KELLY_WIN_RATE # 예상 승률 #
        self.kelly_payoff_ratio = cfg.RISK_KELLY_PAYOFF_RATIO # 예상 손익비 (평균 수익 / 평균 손실) #

        # ML 예측기 사용 여부 (추후 확장용)
        # self.use_ml_risk_predictor = False
        # self.risk_predictor_model = None # load_ml_risk_predictor() 등으로 로드

        logger.info("RiskManager initialized with settings:") #
        logger.info(f"  - Max Position Ratio per Stock: {self.max_pos_ratio:.2%}") #
        logger.info(f"  - Static Stop Loss: {self.stop_loss_pct:.2%}") #
        logger.info(f"  - Trailing Stop Loss: {'Enabled' if self.use_trailing_stop else 'Disabled'} ({self.trailing_stop_pct:.2%})") #
        logger.info(f"  - Kelly Criterion for Sizing: {'Enabled' if self.use_kelly_criterion else 'Disabled'}") #
        if self.use_kelly_criterion: #
            logger.info(f"    - Kelly Fraction: {self.kelly_fraction:.2%}") #
            logger.info(f"    - Kelly Est. Win Rate: {self.kelly_win_rate:.2%}") #
            logger.info(f"    - Kelly Est. Payoff Ratio: {self.kelly_payoff_ratio:.2f}") #


    def calculate_position_size(self, ticker, portfolio_value, current_price, available_cash, current_holdings=0, signal_strength=1.0): #
        """
        현재 포트폴리오 가치, 가격, 가용 현금 등을 고려하여 매수할 포지션 크기(주식 수)를 결정합니다.

        Args:
            ticker (str): 대상 종목 티커.
            portfolio_value (float): 현재 총 포트폴리오 가치 (현금 + 주식 평가액).
            current_price (float): 현재 주가.
            available_cash (float): 현재 보유 현금.
            current_holdings (int, optional): 현재 해당 티커 보유량. Defaults to 0.
            signal_strength (float, optional): 매수 신호의 강도 (0~1). Defaults to 1.0.

        Returns:
            int: 매수할 주식 수 (0 이상).
        """
        if current_price <= 0 or portfolio_value <= 0: #
            logger.warning(f"{ticker}: Cannot calculate position size with zero/negative price or portfolio value.") #
            return 0 #

        # 1. 최대 투자 가능 금액 결정 (종목당, 현재 보유분 제외하고 추가 투자분 기준)
        max_investment_value_per_stock = portfolio_value * self.max_pos_ratio #
        current_holding_value = current_holdings * current_price #
        # 추가로 투자할 수 있는 최대 금액
        max_additional_investment = max(0, max_investment_value_per_stock - current_holding_value) #

        if max_additional_investment <= 0: #
            logger.debug(f"{ticker}: Already at or above max position ratio ({self.max_pos_ratio:.1%}). No additional buy.") #
            return 0 #

        # 2. 포지션 크기 결정 전략 선택
        # 사용할 투자 비율 결정 (포트폴리오 전체 가치 기준)
        target_fraction = self.max_pos_ratio # 기본값: 최대 비율까지 채우도록 시도 #

        if self.use_kelly_criterion: #
            try:
                # 켈리 기준 계산
                b = self.kelly_payoff_ratio #
                p = self.kelly_win_rate * signal_strength # 신호 강도에 따라 승률 조정 가능 #
                q = 1.0 - p #

                if b <= 0: #
                    logger.warning(f"{ticker}: Kelly Criterion - Payoff ratio ({b:.2f}) is not positive. Sizing set to 0.") #
                    kelly_f = 0.0 #
                elif p < 0 or p > 1: #
                     logger.warning(f"{ticker}: Kelly Criterion - Win rate ({p:.2f}) is invalid. Sizing set to 0.") #
                     kelly_f = 0.0 #
                else:
                    kelly_f = (p * b - q) / b #
                    if not (0 < kelly_f): # 0 이하의 켈리값은 투자하지 않음 #
                        logger.debug(f"{ticker}: Kelly Criterion calculated non-positive fraction ({kelly_f:.4f}). Setting to 0.") #
                        kelly_f = 0.0 #
                    else:
                         # 소수점 자리수 제한 등 안정화 로직 추가 가능
                         kelly_f = round(kelly_f, 4) #

                # 최종 사용할 투자 비율 (켈리 비율 적용)
                adjusted_kelly_fraction = kelly_f * self.kelly_fraction #
                target_fraction = min(adjusted_kelly_fraction, self.max_pos_ratio) # 최대 비율 넘지 않도록 #
                logger.debug(f"{ticker}: Kelly sizing: f={kelly_f:.4f}, adjusted_f={target_fraction:.4f} (KellyFrac:{self.kelly_fraction:.2%}, Signal:{signal_strength:.2f}, MaxRatio:{self.max_pos_ratio:.2f})") #

            except ZeroDivisionError:
                 logger.error(f"{ticker}: Error calculating Kelly Criterion - Division by zero (Payoff Ratio might be 0?). Falling back.") #
                 target_fraction = self.max_pos_ratio # 오류 시 기본값 사용 #
            except Exception as e:
                logger.error(f"{ticker}: Error calculating Kelly Criterion: {e}. Falling back.") #
                target_fraction = self.max_pos_ratio #

        # 3. 최종 추가 투자 금액 및 주식 수 계산
        # 목표 보유 금액
        target_holding_value = portfolio_value * target_fraction #
        # 추가로 매수할 금액
        additional_investment_target = max(0, target_holding_value - current_holding_value) #

        # 실제 투자 가능 금액 제한 (추가 가능 금액과 가용 현금 중 작은 값)
        affordable_investment = min(max_additional_investment, available_cash, additional_investment_target) #

        if affordable_investment <= 0: #
            logger.debug(f"{ticker}: No affordable investment amount (Max Add: {max_additional_investment:.0f}, Cash: {available_cash:.0f}, Target Add: {additional_investment_target:.0f})") #
            return 0 #

        # 수수료/슬리피지 고려한 실제 구매 가격
        effective_buy_price = current_price * (1 + cfg.ENV_TRANSACTION_COST_PCT + cfg.ENV_SLIPPAGE_PCT) #
        if effective_buy_price <= 0: #
            logger.warning(f"{ticker}: Effective buy price is zero or negative.") #
            return 0 #

        # 구매할 주식 수 계산
        position_size = int(affordable_investment / effective_buy_price) #

        # 로그 개선: 어떤 제한(최대비율, 현금, 켈리)에 걸렸는지 명시하면 좋음
        limit_reason = "" #
        if affordable_investment == max_additional_investment: limit_reason = "Max Position Ratio" #
        elif affordable_investment == available_cash: limit_reason = "Available Cash" #
        elif affordable_investment == additional_investment_target: limit_reason = "Kelly/Target Fraction" #

        logger.info(f"{ticker}: Calculated position size to buy: {position_size} shares. " #
                    f"(Affordable Investment: {affordable_investment:,.0f} based on {limit_reason}, " #
                    f"Target Fraction: {target_fraction:.2%}, Price: {current_price:.2f})") #

        return max(0, position_size) # 최종적으로 0 이상 반환 #


    def check_stop_loss(self, ticker, current_price, entry_price, high_water_mark, position_size): #
        """
        현재 가격을 기준으로 손절매 조건을 확인합니다.

        Args:
            ticker (str): 대상 종목 티커.
            current_price (float): 현재 주가.
            entry_price (float): 해당 포지션 진입 가격.
            high_water_mark (float): 해당 포지션 진입 후 최고가 (추적 손절매용).
            position_size (int): 현재 보유 수량 (0보다 커야 함).

        Returns:
            bool: 손절매 조건이 충족되면 True, 아니면 False.
        """
        if position_size <= 0 or entry_price <= 0 or current_price <= 0: #
            return False # 포지션이 없거나 가격이 유효하지 않으면 손절매 없음 #

        stop_loss_triggered = False #
        trigger_reason = "" #

        # 1. 고정 손절매 확인
        static_stop_price = entry_price * (1.0 - self.stop_loss_pct) #
        if current_price <= static_stop_price: #
            stop_loss_triggered = True #
            trigger_reason = f"Static Stop Loss ({self.stop_loss_pct:.2%})" #
            logger.info(f"{ticker}: {trigger_reason} triggered at {current_price:.2f} (Entry: {entry_price:.2f}, Stop Price: {static_stop_price:.2f})") #

        # 2. 추적 손절매 확인 (활성화 시 및 고정 손절 발동 안했을 때)
        if not stop_loss_triggered and self.use_trailing_stop: #
            # high_water_mark는 최소한 진입 가격 이상이어야 함
            effective_high_water_mark = max(high_water_mark, entry_price) #
            trailing_stop_price = effective_high_water_mark * (1.0 - self.trailing_stop_pct) #

            if current_price <= trailing_stop_price: #
                stop_loss_triggered = True #
                trigger_reason = f"Trailing Stop Loss ({self.trailing_stop_pct:.2%})" #
                logger.info(f"{ticker}: {trigger_reason} triggered at {current_price:.2f} (High: {effective_high_water_mark:.2f}, Stop Price: {trailing_stop_price:.2f})") #

        return stop_loss_triggered #

In [22]:
# rl_agent.py
import os
import numpy as np
import pandas as pd
import logging
from stable_baselines3 import PPO, DQN, A2C  # 사용할 알고리즘 임포트
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnRewardThreshold, CheckpointCallback
from stable_baselines3.common.monitor import Monitor

# 로거 및 cfg 객체는 외부 스코프(예: main.py)에서 주입된다고 가정
# logger = logging.getLogger(__name__)
# cfg = Config()

class RLAgent:
    """
    Stable Baselines3를 사용하여 강화학습 에이전트를 관리하는 클래스.
    모델 학습, 저장, 로드, 예측 기능을 제공합니다.
    """
    def __init__(self, ticker, env_factory):
        """
        RLAgent 초기화

        Args:
            ticker (str): 이 에이전트가 담당하는 티커.
            env_factory (callable): 인자 없이 호출 시 StockTradingEnv 인스턴스를 반환하는 함수.
                                  (Stable Baselines3의 VecEnv 생성에 필요)
        """
        global logger, cfg # 전역 logger, cfg 사용 명시
        self.logger = logger
        self.cfg = cfg

        self.ticker = ticker
        self.env_factory = env_factory
        self.model_type = self.cfg.AGENT_TYPE.upper() # PPO, DQN 등
        self.policy_type = self.cfg.AGENT_POLICY # MlpPolicy, CnnPolicy 등
        self.model_path = os.path.join(self.cfg.MODEL_DIR, f"{self.ticker}_{self.model_type.lower()}_agent.zip")

        # 모델 인스턴스 초기화 (None)
        self.model = None
        # 환경 인스턴스 (필요 시 생성하여 사용)
        self.env = None

        # --- 모델 로드 시도 (선택적) ---
        # 초기화 시 바로 모델 로드
        self.load_model()

        # --- VecEnv 생성 (학습/평가에 필요) ---
        # 학습 또는 백테스트 시작 전에 생성됨
        self.vec_env = None
        self.eval_vec_env = None

        self.logger.info(f"RLAgent initialized for ticker '{self.ticker}'. Model type: {self.model_type}, Policy: {self.policy_type}")
        if self.model:
            self.logger.info(f"  > Model loaded successfully from: {self.model_path}")
        else:
            self.logger.info(f"  > No pre-trained model found at: {self.model_path}. Need to train.")

    def _get_model_class(self):
        """설정된 모델 타입에 맞는 Stable Baselines3 클래스 반환"""
        if self.model_type == 'PPO':
            return PPO
        elif self.model_type == 'DQN':
            return DQN
        elif self.model_type == 'A2C':
            return A2C
        # 필요시 다른 알고리즘 추가
        else:
            raise ValueError(f"Unsupported RL algorithm type: {self.model_type}")

    def _get_model_hyperparams(self):
        """설정된 모델 타입에 맞는 하이퍼파라미터 딕셔너리 반환"""
        if self.model_type == 'PPO':
            return {
                'n_steps': self.cfg.PPO_N_STEPS,
                'batch_size': self.cfg.PPO_BATCH_SIZE,
                'n_epochs': self.cfg.PPO_N_EPOCHS,
                'gamma': self.cfg.PPO_GAMMA,
                'gae_lambda': self.cfg.PPO_GAE_LAMBDA,
                'clip_range': self.cfg.PPO_CLIP_RANGE,
                'ent_coef': self.cfg.PPO_ENT_COEF,
                'vf_coef': self.cfg.PPO_VF_COEF,
                'learning_rate': self.cfg.PPO_LEARNING_RATE,
                'max_grad_norm': self.cfg.PPO_MAX_GRAD_NORM,
                'policy_kwargs': dict(net_arch=[dict(pi=[64, 64], vf=[64, 64])]) # 예시 신경망 구조
                # 필요 시 다른 하이퍼파라미터 추가
            }
        elif self.model_type == 'DQN':
            return {
                'buffer_size': self.cfg.DQN_BUFFER_SIZE,
                'learning_rate': self.cfg.DQN_LEARNING_RATE,
                'batch_size': self.cfg.DQN_BATCH_SIZE,
                'gamma': self.cfg.DQN_GAMMA,
                'tau': self.cfg.DQN_TAU,
                'train_freq': self.cfg.DQN_TRAIN_FREQ,
                'gradient_steps': self.cfg.DQN_GRADIENT_STEPS,
                'learning_starts': self.cfg.DQN_LEARNING_STARTS,
                'exploration_fraction': self.cfg.DQN_EXPLORATION_FRACTION,
                'exploration_final_eps': self.cfg.DQN_EXPLORATION_FINAL_EPS,
                'target_update_interval': self.cfg.DQN_TARGET_UPDATE_INTERVAL,
                # 'policy_kwargs': ... # 필요 시 신경망 구조 등 정의
            }
        # 필요시 다른 알고리즘 하이퍼파라미터 추가
        else:
            # 기본 PPO 하이퍼파라미터 반환 또는 에러
            self.logger.warning(f"Hyperparameters not explicitly defined for {self.model_type}. Using PPO defaults.")
            ppo_params = self._get_model_hyperparams() # PPO 파라미터 재호출
            ppo_params.pop('policy_kwargs', None) # 다른 알고리즘에 맞지 않을 수 있음
            return ppo_params


    @timeit
    def train(self, total_timesteps, eval_env_factory=None, n_eval_episodes=5, eval_freq=10000):
        """
        RL 에이전트 학습
        """
        self.logger.info(f"--- Starting Training for {self.ticker} ({self.model_type}) ---")
        self.logger.info(f"Total timesteps: {total_timesteps}")

        # 1. Vectorized 환경 생성 (env_factory 사용)
        # Monitor 래핑은 env_factory 내에서 이미 처리되었거나, 여기서 make_vec_env 에서 래핑할 수도 있음
        # self.vec_env = make_vec_env(self.env_factory, n_envs=1, vec_env_cls=DummyVecEnv, monitor_dir=self.cfg.LOG_DIR, monitor_kwargs={'filename': f"{self.ticker}_train"})
        # 또는 __init__에서 생성된 self.env 재사용 (만약 __init__에서 생성했다면)
        if self.env is None: # __init__에서 환경 생성이 안된 경우 대비
             monitored_env_factory = lambda: Monitor(self.env_factory(), filename=os.path.join(self.cfg.LOG_DIR, f"{self.ticker}_train_monitor.csv"))
             self.vec_env = DummyVecEnv([monitored_env_factory])
             self.logger.info(f"Training VecEnv created inside train method for {self.ticker}")
        else:
             self.vec_env = self.env # __init__ 에서 생성된 환경 사용
             self.logger.info(f"Using existing VecEnv created during initialization for {self.ticker}")

        # --- 아래 Monitor 중복 적용 라인 제거 ---
        # self.vec_env = Monitor(self.vec_env, filename=os.path.join(self.cfg.LOG_DIR, f"{self.ticker}_train_monitor.csv")) # <--- 이 라인 삭제!

        self.logger.info(f"Training VecEnv set up for {self.ticker}")


        # 3. 모델 인스턴스 생성 또는 로드
        ModelClass = self._get_model_class()
        model_hyperparams = self._get_model_hyperparams()

        if self.model is None: # 기존에 로드된 모델이 없으면 새로 생성
             self.logger.info(f"Creating a new {self.model_type} model for {self.ticker}...")
             # TensorBoard 로깅 설정
             tensorboard_log_path = os.path.join(self.cfg.LOG_DIR, f"{self.ticker}_tensorboard")
             os.makedirs(tensorboard_log_path, exist_ok=True)

             self.model = ModelClass(
                 self.policy_type,
                 self.vec_env,
                 verbose=1, # 학습 진행 상황 출력 레벨 (0: 없음, 1: 기본, 2: 디버그)
                 tensorboard_log=tensorboard_log_path,
                 seed=self.cfg.RANDOM_SEED, # 시드 설정
                 device='auto', # 'cuda' or 'cpu' or 'auto'
                 **model_hyperparams # 하이퍼파라미터 언패킹
             )
        else:
             self.logger.info(f"Continue training existing {self.model_type} model for {self.ticker}...")
             # 환경 재설정 (중요!)
             self.model.set_env(self.vec_env)
             # 필요시 러닝 레이트 등 재설정 가능
             # self.model.learning_rate = new_lr

        # 4. 학습 실행
        self.logger.info(f"Starting model.learn() for {total_timesteps} timesteps...")
        try:
            self.model.learn(
                total_timesteps=total_timesteps,
                callback=callbacks if callbacks else None,
                log_interval=10, # 로그 출력 빈도 (에피소드 기준)
                tb_log_name=f"{self.ticker}_{self.model_type}", # TensorBoard 로그 이름
                reset_num_timesteps=False # 이어서 학습 시 False
            )
            self.logger.info(f"Training finished for {self.ticker}.")

            # 5. 최종 모델 저장
            self.save_model()

        except Exception as e:
             self.logger.error(f"Error during model training for {self.ticker}: {e}", exc_info=True)
             # 학습 중 에러 발생 시 현재까지의 모델 저장 시도 (선택적)
             try:
                 error_save_path = os.path.join(self.cfg.MODEL_DIR, f"{self.ticker}_{self.model_type.lower()}_error_save.zip")
                 self.model.save(error_save_path)
                 self.logger.info(f"Model saved due to error at: {error_save_path}")
             except Exception as save_e:
                 self.logger.error(f"Could not save model after training error: {save_e}")

        finally:
             # 학습 완료 또는 에러 발생 후 VecEnv 닫기
             if self.vec_env:
                 try: self.vec_env.close(); self.logger.info("Training VecEnv closed.")
                 except Exception as e: self.logger.error(f"Error closing training VecEnv: {e}")
                 self.vec_env = None
             if self.eval_vec_env:
                 try: self.eval_vec_env.close(); self.logger.info("Evaluation VecEnv closed.")
                 except Exception as e: self.logger.error(f"Error closing evaluation VecEnv: {e}")
                 self.eval_vec_env = None


    def save_model(self):
        """현재 모델을 파일로 저장"""
        if self.model:
            try:
                self.model.save(self.model_path)
                self.logger.info(f"Model for {self.ticker} saved successfully to {self.model_path}")
            except Exception as e:
                self.logger.error(f"Failed to save model for {self.ticker} to {self.model_path}: {e}")
        else:
            self.logger.warning(f"No model instance exists for {self.ticker} to save.")

    def load_model(self):
        """파일에서 모델 로드"""
        if os.path.exists(self.model_path):
            try:
                ModelClass = self._get_model_class()
                self.model = ModelClass.load(
                    self.model_path,
                    # custom_objects={'learning_rate': 0.0} # 필요시 로드 시 파라미터 변경
                    device='auto' # 로드 시 디바이스 설정
                )
                self.logger.info(f"Model for {self.ticker} loaded successfully from {self.model_path}")
                # 로드 후 환경 설정 (predict 전 필요할 수 있음)
                # self.model.set_env(self.env_factory()) # 예측만 할 경우 꼭 필요하지 않을 수 있음
            except Exception as e:
                self.logger.error(f"Failed to load model for {self.ticker} from {self.model_path}: {e}")
                self.model = None
        else:
            # self.logger.debug(f"Model file not found for {self.ticker} at {self.model_path}")
            self.model = None

    def predict(self, observation, deterministic=True):
        """
        주어진 관찰(상태)에 대해 에이전트의 행동 예측

        Args:
            observation (dict): 환경에서 받은 관찰 값 (StockTradingEnv의 observation_space 형식).
            deterministic (bool): 결정론적 예측 사용 여부 (True 권장).

        Returns:
            tuple: (action, state) 예측된 행동과 내부 상태 (LSTM 등 사용 시).
                   행동은 모델 타입에 따라 다름 (예: PPO는 단일 값 또는 배열).
        """
        if self.model:
            try:
                action, state = self.model.predict(observation, deterministic=deterministic)
                return action, state
            except Exception as e:
                self.logger.error(f"Error during prediction for {self.ticker}: {e}. Returning default action (Hold).")
                # 오류 시 기본 행동 반환 (예: Hold=2)
                # 환경의 action_space 에 따라 기본값 조정 필요
                # Discrete(3) 공간이라면 2가 Hold 일 가능성 높음
                num_tickers = len(observation['portfolio']) - 1 # 포트폴리오 상태에서 티커 수 추론
                default_action = 2 # Hold 가정
                # 모델 출력 형식에 맞게 반환 (PPO는 보통 단일 array 반환)
                # DQN 은 하나의 값 반환
                if self.model_type == 'DQN':
                    return default_action, None
                else: # PPO, A2C 등 (보통 배열 반환)
                    # 환경의 action_space가 Dict 형태면 어떻게 반환해야 할지 확인 필요
                    # 여기서는 단일 티커 환경 기준으로 가정하고 action 반환
                    return np.array([default_action]), None

        else:
            self.logger.warning(f"No model loaded for {self.ticker}. Returning default action (Hold).")
            # 모델 없을 시 기본 행동 반환
            num_tickers = len(observation['portfolio']) - 1
            default_action = 2
            if self.model_type == 'DQN':
                 return default_action, None
            else:
                 return np.array([default_action]), None

In [23]:
# backtester.py
import backtrader as bt #
import pandas as pd #
import numpy as np #
from datetime import datetime #
import os #

# --- Backtrader 데이터 피드 정의 (특징 데이터 포함) ---
class PandasDataWithFeatures(bt.feeds.PandasData): #
    """
    Pandas DataFrame에서 OHLCV 외에 추가 특징(feature) 컬럼들을
    라인(line)으로 사용할 수 있도록 확장한 데이터 피드 클래스.
    """
    # lines 튜플은 params 에서 동적으로 설정됨
    lines = () #

    # params 튜플에 feature_cols 추가
    params = ( #
        ('feature_cols', []), # 특징 컬럼 이름 리스트를 파라미터로 받음 #
    )

    # datafields 리스트는 params를 기반으로 __init__에서 동적으로 완성됨
    # datafields = bt.feeds.PandasData.datafields + [...] # 이렇게 하지 않음

    def __init__(self): #
        # params에서 feature_cols 가져오기
        feature_cols = self.p.feature_cols #
        # lines 튜플에 feature_cols 추가
        self.lines = tuple(feature_cols) #
        # datafields 리스트 업데이트 (명시적으로 라인 추가)
        for line_name in self.lines: #
             # backtrader 1.9.76.123 기준, linealias 로 자동 설정되는 경우도 있음
             # 명시적으로 선언하여 호환성 확보 시도
             setattr(self.lines, line_name, None) # 라인 이름 선언 #
             # 또는 self.lines.add_line(line_name) # 예전 방식일 수 있음

        super(PandasDataWithFeatures, self).__init__() # 상위 클래스 초기화 #
        # logger.info(f"PandasDataWithFeatures feed created with custom lines: {self.lines._fields}")


# --- Backtrader 전략 클래스 ---
class BacktraderStrategy(bt.Strategy): #
    """
    RL 에이전트와 RiskManager를 사용하여 Backtrader에서 실행되는 전략 클래스.
    """
    params = ( #
        ('agent', None),            # 학습된 RLAgent 인스턴스 #
        ('risk_manager', None),     # RiskManager 인스턴스 #
        # ('feature_engineer', None), # 특징 추출은 run_backtest에서 수행하므로 전략에는 불필요
        ('ticker', None),           # 현재 전략이 적용되는 티커 #
        ('all_tickers', []),        # 전체 티커 리스트 (상태 구성 시 필요) #
        ('window_size', 30),        # 에이전트 관찰 기간 (기본값) #
        ('num_features', 0),        # 특징 개수 (관찰 구성 시 필요) #
        # ('processed_data', None),   # 전체 데이터를 전달하는 대신, 라인으로 접근
    )

    def __init__(self): #
        """전략 초기화"""
        if not self.p.agent or not self.p.risk_manager or not self.p.ticker or not self.p.all_tickers or self.p.num_features <= 0: #
            raise ValueError("Agent, RiskManager, Ticker, All Tickers list, and Num Features must be provided in params") #

        self.agent = self.p.agent #
        self.risk_manager = self.p.risk_manager #
        self.ticker = self.p.ticker #
        self.all_tickers = self.p.all_tickers # 전체 티커 리스트 저장 #
        self.window_size = self.p.window_size #
        self.num_features = self.p.num_features #

        # 데이터 라인 별칭 설정
        self.dt = self.datas[0].datetime #
        self.open = self.datas[0].open #
        self.high = self.datas[0].high #
        self.low = self.datas[0].low #
        self.close = self.datas[0].close #
        self.volume = self.datas[0].volume #
        # 특징 라인 접근용 딕셔너리 생성 (PandasDataWithFeatures에서 정의된 lines 사용)
        self.feature_lines = {line_name: getattr(self.datas[0].lines, line_name) for line_name in self.datas[0].lines._fields if line_name not in bt.feeds.PandasData.lines._fields} #

        self.order = None #
        self.buy_price = 0.0 #
        self.buy_comm = 0.0 #
        self.high_water_mark = 0.0 #
        self.peak_portfolio_value = self.broker.getvalue() #
        self.bar_executed = 0 # 실행된 바 카운트

        # logger 는 전역 변수 사용 (utils.py 에서 설정됨)
        # self.log = logger

        logger.info(f"Strategy initialized for {self.ticker}. Data available: {len(self.datas[0])} bars.") #
        logger.info(f"Number of features expected: {self.num_features}. Features available in feed: {list(self.feature_lines.keys())}") #


    def log(self, txt, dt=None): #
        ''' 표준 로깅 함수 '''
        # self.datas[0].datetime.date(0) 는 현재 bar의 날짜 객체를 반환
        dt_obj = bt.num2date(self.dt[0]) # 현재 bar의 datetime 객체 얻기 #
        # 로그 출력 시 logger 직접 사용
        logger.info(f'{dt_obj.strftime("%Y-%m-%d")} - [{self.ticker}] {txt}') #


    def notify_order(self, order): #
        """주문 상태 변경 시 호출"""
        if order.status in [order.Submitted, order.Accepted]: #
            return #

        if order.status in [order.Completed]: #
            executed_size = order.executed.size #
            # buy/sell 에 따라 부호가 다를 수 있음, 절대값 사용 또는 부호 확인
            if order.isbuy(): #
                self.log(f'BUY EXECUTED @{order.executed.price:.2f}, Cost:{order.executed.value:.2f}, Comm:{order.executed.comm:.2f}, Size:{executed_size}') #
                # 평균 매수 단가 업데이트 로직 필요 시 추가 (여기서는 마지막 매수가격만 기록)
                self.buy_price = order.executed.price #
                self.buy_comm = order.executed.comm #
                self.high_water_mark = self.high[0] # 현재 바의 고가로 초기화 #
            elif order.issell(): #
                # 매도 시 executed.size 는 음수일 수 있음
                self.log(f'SELL EXECUTED @{order.executed.price:.2f}, Cost:{order.executed.value:.2f}, Comm:{order.executed.comm:.2f}, Size:{executed_size}') #
            self.order = None #

        elif order.status in [order.Canceled, order.Margin, order.Rejected, order.Expired]: #
            self.log(f'ORDER FAILED/CANCELLED: Status {order.getstatusname()}, Ref: {order.ref}') #
            self.order = None #

    def notify_trade(self, trade): #
        """거래(매수-매도 사이클) 완료 시 호출"""
        if not trade.isclosed: #
            return #
        self.log(f'TRADE CLOSED - PNL Gross:{trade.pnl:.2f}, Net:{trade.pnlcomm:.2f}') #
        self.buy_price = 0.0 #
        self.buy_comm = 0.0 #
        self.high_water_mark = 0.0 #


    def _get_current_observation(self): #
        """현재 스텝 기준 RL 에이전트용 관찰(상태) 구성"""
        obs_dict = {'features': {}, 'portfolio': {}} #

        # 1. 특징 데이터 (과거 window_size 만큼)
        feature_data_np = np.zeros((self.window_size, self.num_features), dtype=np.float32) #
        # 현재 bar 포함하여 window_size 만큼의 데이터 추출
        for i in range(self.window_size): #
            idx = -i # 현재 bar는 0, 이전 bar는 -1, ... #
            try:
                # 각 feature line 에서 과거 값 가져오기
                feature_values = [self.feature_lines[fname][idx] for fname in self.feature_lines] #
                # window_size 만큼 채워넣기 (과거 -> 현재 순서로)
                feature_data_np[self.window_size - 1 - i, :] = feature_values #
            except IndexError:
                # 데이터 시작 부분에서 window_size 만큼 데이터가 없을 경우 0으로 채워짐 (np.zeros 초기화)
                # logger.debug(f"Index out of bounds at relative index {idx} for ticker {self.ticker}. Using zeros.")
                pass # 이미 0으로 초기화되어 있음 #

        obs_dict['features'][self.ticker] = feature_data_np #

        # 2. 포트폴리오 상태
        current_portfolio_value = self.broker.getvalue() #
        current_cash = self.broker.getcash() #
        portfolio_state = np.zeros(len(self.all_tickers) + 1, dtype=np.float32) #

        if current_portfolio_value > 1e-6: # 0으로 나누기 방지 #
            cash_ratio = np.clip(current_cash / current_portfolio_value, 0.0, 1.0) # 0~1 사이 값 #
            portfolio_state[-1] = cash_ratio #

            try:
                ticker_index = self.all_tickers.index(self.ticker) #
                current_price = self.close[0] #
                current_position = self.getposition(self.datas[0]) # 현재 포지션 객체 #
                position_size = current_position.size # 보유 수량 #
                holding_value = position_size * current_price #
                holding_ratio = np.clip(holding_value / current_portfolio_value, 0.0, 1.0) #
                portfolio_state[ticker_index] = holding_ratio #
            except ValueError:
                 logger.error(f"Ticker {self.ticker} not found in all_tickers list.") #
            except Exception as e:
                 logger.error(f"Error calculating portfolio state for {self.ticker}: {e}") #


            # 합계 1로 정규화 (포트폴리오 상태 벡터의 합이 1이 되도록)
            state_sum = portfolio_state.sum() #
            if state_sum > 1e-6: # 0으로 나누기 방지 #
                 portfolio_state /= state_sum #
            else:
                 # 합계가 0에 가까우면 현금 비율만 1로 설정 (예외 처리)
                 portfolio_state[-1] = 1.0 #


        obs_dict['portfolio'] = portfolio_state #

        return obs_dict #


    def next(self): #
        """매 시점(bar)마다 호출되는 핵심 로직"""
        self.bar_executed += 1 # 실행된 바 카운트 증가 #

        # 데이터가 window_size 만큼 쌓이기 전에는 거래하지 않음
        if self.bar_executed < self.window_size: #
            return #

        current_portfolio_value = self.broker.getvalue() #
        current_cash = self.broker.getcash() #
        current_date = bt.num2date(self.dt[0]) # datetime 객체 #
        current_price = self.close[0] # 현재 봉 종가 #
        current_position = self.getposition(self.datas[0]) # 현재 포지션 객체 #
        position_size = current_position.size # 현재 보유 수량 #

        # 최대 낙폭 업데이트 및 체크
        self.peak_portfolio_value = max(self.peak_portfolio_value, current_portfolio_value) #
        # drawdown = (self.peak_portfolio_value - current_portfolio_value) / self.peak_portfolio_value
        # if drawdown >= self.risk_manager.max_system_drawdown:
        #      self.log(f"CRITICAL: System Drawdown Limit ({self.risk_manager.max_system_drawdown:.1%}) Reached! Current DD: {drawdown:.1%}. Closing position.")
        #      if position_size != 0: self.order = self.close()
        #      # 여기서 cerebro 엔진을 멈추는 방법은 직접적으로 없음, 전략 실행 중단만 가능
        #      # self.env.close() # backtrader 환경에는 env 없음
        #      return

        if self.order: return # 진행 중인 주문 있으면 스킵 #

        # 1. 현재 상태 생성
        current_observation = self._get_current_observation() #

        # 2. RL 에이전트 행동 예측
        predicted_action = 2 # 기본값: Hold #
        try:
            action_raw, _ = self.agent.predict(current_observation, deterministic=True) #
            # action_raw 는 보통 array 형태 (예: array([0]))
            predicted_action = int(action_raw.item()) if isinstance(action_raw, np.ndarray) else int(action_raw) #
            if predicted_action not in [0, 1, 2]: #
                 logger.warning(f"Agent predicted invalid action: {predicted_action}. Defaulting to Hold (2).") #
                 predicted_action = 2 #
        except Exception as e:
            logger.error(f"Error getting prediction from agent: {e}. Defaulting to Hold (2).") #
            predicted_action = 2 #

        # 3. 거래 실행 로직
        if position_size == 0: # 포지션 없음 #
            if predicted_action == 0: # 매수 신호 #
                size_to_buy = self.risk_manager.calculate_position_size( #
                    ticker=self.ticker, #
                    portfolio_value=current_portfolio_value, #
                    current_price=current_price, #
                    available_cash=current_cash, #
                    current_holdings=0 #
                )
                if size_to_buy > 0: #
                    self.log(f'BUY SIGNAL: Action={predicted_action}. Creating order for {size_to_buy} shares at ~{current_price:.2f}') #
                    self.order = self.buy(size=size_to_buy) #
                # else: self.log(f'BUY SIGNAL: Action={predicted_action}, but calculated size is 0. Holding.')
            # else: 매도/유지 신호 시 아무것도 안 함

        elif position_size > 0: # 롱 포지션 보유 중 #
            self.high_water_mark = max(self.high_water_mark, self.high[0]) #
            stop_loss_triggered = self.risk_manager.check_stop_loss( #
                ticker=self.ticker, current_price=current_price, #
                entry_price=current_position.price, # backtrader 포지션 객체의 평균 매입 단가 사용 #
                high_water_mark=self.high_water_mark, #
                position_size=position_size #
            )

            if predicted_action == 1 or stop_loss_triggered: # 매도 신호 또는 손절매 발동 #
                sell_reason = "Agent Sell Signal" if predicted_action == 1 else "Stop Loss Triggered" #
                self.log(f'SELL CREATE ({sell_reason}): Closing position of {position_size} shares at ~{current_price:.2f}') #
                self.order = self.close() # 포지션 전체 청산 #
            # else: 매수/유지 신호 시 홀드 (추가 매수 로직은 복잡도 증가로 일단 제외)

        # else: position_size < 0 (숏 포지션) - 현재 로직은 롱만 가정

    def stop(self): #
        """백테스트 종료 시 호출"""
        final_value = self.broker.getvalue() #
        self.log(f'--- Strategy Stop for {self.ticker} ---') #
        self.log(f'Starting Portfolio Value: {self.broker.startingcash:,.2f}') #
        self.log(f'Final Portfolio Value:    {final_value:,.2f}') #
        self.log(f'Total PnL:                {final_value - self.broker.startingcash:,.2f}') #
        self.log(f'Peak Portfolio Value:     {self.peak_portfolio_value:,.2f}') #
        drawdown = (self.peak_portfolio_value - final_value) / self.peak_portfolio_value if self.peak_portfolio_value > 0 else 0 #
        self.log(f'Final Drawdown:           {drawdown:.2%}') #
        self.log(f'--------------------------------------') #

@timeit #
def run_backtest(ticker, dm, fe, agent, rm): #
    """
    단일 종목에 대한 Backtrader 백테스트를 실행합니다.
    """
    logger.info(f"--- Starting Backtest for {ticker} ---") #
    cerebro = bt.Cerebro(stdstats=False) # 기본 분석기 사용 안 함 (수동 추가) #

    # 1. 데이터 로드 및 전처리
    logger.info(f"Loading and processing data for backtest period: {cfg.TEST_START_DATE} to {cfg.TEST_END_DATE}") #
    price_data_raw = dm.get_data('stock_prices', tickers=ticker, #
                                 start_date=cfg.TEST_START_DATE, end_date=cfg.TEST_END_DATE) #
    if price_data_raw is None or price_data_raw.empty or len(price_data_raw) <= cfg.ENV_WINDOW_SIZE: #
        logger.error(f"Not enough price data found for {ticker} in the backtest period ({len(price_data_raw)} rows <= window {cfg.ENV_WINDOW_SIZE}). Skipping.") #
        return None, None #

    processed_data = fe.process_features(price_data_raw) #
    if processed_data is None or processed_data.empty or len(processed_data) <= cfg.ENV_WINDOW_SIZE: #
        logger.error(f"Feature processing failed or resulted in insufficient data for {ticker} ({len(processed_data)} rows <= window {cfg.ENV_WINDOW_SIZE}). Skipping.") #
        return None, None #

    # Backtrader 피드용 컬럼 확인 및 준비
    required_bt_cols = ['open', 'high', 'low', 'close', 'volume'] #
    if not all(col in processed_data.columns for col in required_bt_cols): #
        logger.warning(f"Processed data for {ticker} missing OHLCV columns. Merging raw data.") #
        # 인덱스가 datetime 객체인지 확인
        if not isinstance(processed_data.index, pd.DatetimeIndex): #
             processed_data.index = pd.to_datetime(processed_data.index) #
        if not isinstance(price_data_raw.index, pd.DatetimeIndex): #
             price_data_raw.index = pd.to_datetime(price_data_raw.index) #

        # merge 대신 join 사용 (인덱스 기준)
        processed_data = processed_data.join(price_data_raw[required_bt_cols], how='left', lsuffix='_feat') #
        # merge 후에도 없으면 에러 처리
        if not all(col in processed_data.columns for col in required_bt_cols): #
              logger.error(f"Could not merge required OHLCV columns for {ticker}. Skipping.") #
              return None, None #

    # 사용할 특징 컬럼 리스트 추출
    # 주의: OHLCV 및 adj_close 는 모델 학습 시 특징으로 사용되었다면 포함해야 함.
    #       여기서는 모델 입력 특징과 피드 구성 특징을 분리하여 생각.
    #       피드에는 OHLCV + 계산된 지표들 모두 포함.
    #       전략에서는 self.feature_lines 로 계산된 지표들에 접근 가능.
    #       _get_current_observation 에서는 필요한 모든 특징을 수집해야 함.
    feature_columns = [col for col in processed_data.columns if col not in #
                       ['open', 'high', 'low', 'close', 'volume', 'ticker', 'source', 'date']] # 기본 OHLCV 등 제외 #
    # 만약 adj_close 가 특징으로 사용되었다면 위 리스트에서 제외하면 안됨!
    # 예: final_feature_list = ['MA5', 'RSI14', 'adj_close', ...] # 모델이 학습한 특징 리스트
    # feature_columns = [col for col in processed_data.columns if col in final_feature_list]

    if not feature_columns: #
         logger.warning(f"No feature columns identified for {ticker}. Check FeatureEngineer output.") #
         # 특징 없이 진행할 수도 있으나, RL 에이전트가 작동하지 않을 수 있음

    processed_data['openinterest'] = 0.0 #

    # --- PandasDataWithFeatures 피드 생성 ---
    data_feed = PandasDataWithFeatures( #
        dataname=processed_data, #
        fromdate=datetime.strptime(cfg.TEST_START_DATE, '%Y-%m-%d'), #
        todate=datetime.strptime(cfg.TEST_END_DATE, '%Y-%m-%d'), #
        # datetime=None, # 인덱스가 datetime 이면 자동 인식
        open='open', high='high', low='low', close='close', volume='volume', #
        openinterest='openinterest', #
        feature_cols=feature_columns # 사용자 정의 라인으로 추가될 컬럼들 #
    )

    cerebro.adddata(data_feed, name=ticker) #
    logger.info(f"Data feed for {ticker} added. Length: {len(processed_data)}, Num Features for Strategy: {len(feature_columns)}") #

    # 2. 전략 추가
    num_features = len(feature_columns) #
    cerebro.addstrategy(BacktraderStrategy, #
                        agent=agent, risk_manager=rm, ticker=ticker, #
                        all_tickers=cfg.TARGET_TICKERS, # 전체 티커 리스트 전달 #
                        window_size=cfg.ENV_WINDOW_SIZE, #
                        num_features=num_features) # 특징 개수 전달 #

    # 3. 브로커 설정
    cerebro.broker.setcash(cfg.BACKTEST_INITIAL_CASH) #
    cerebro.broker.setcommission(commission=cfg.BACKTEST_COMMISSION_PCT, commtype=bt.CommInfoBase.COMM_PERC) #
    cerebro.broker.set_slippage_perc(perc=cfg.BACKTEST_SLIPPAGE_PCT) #

    # 4. 분석기 추가
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', timeframe=bt.TimeFrame.Days, annualization=252, riskfreerate=0.0) #
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns', timeframe=bt.TimeFrame.Days) #
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') #
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades') # 이름 변경 #
    cerebro.addanalyzer(bt.analyzers.SQN, _name='sqn') #

    # 5. 백테스트 실행
    logger.info(f"Running Cerebro backtest for {ticker}...") #
    try:
        results = cerebro.run() #
        strat_results = results[0] #
    except IndexError:
        logger.error(f"Cerebro run returned empty results for {ticker}.") #
        return None, None #
    except Exception as e:
        logger.error(f"Error during Cerebro run for {ticker}: {e}", exc_info=True) #
        return None, None #

    # 6. 결과 분석 및 로깅
    logger.info(f"--- Backtest Results for {ticker} ---") #
    final_value = cerebro.broker.getvalue() #
    initial_cash = cfg.BACKTEST_INITIAL_CASH #
    logger.info(f"Starting Portfolio Value: {initial_cash:,.2f}") #
    logger.info(f"Final Portfolio Value:    {final_value:,.2f}") #
    total_return_pct = (final_value / initial_cash - 1) * 100 if initial_cash > 0 else 0 #
    logger.info(f"Total Net Return:         {total_return_pct:.2f}%") #

    analysis = {} #
    try:
        analysis['sharpe'] = strat_results.analyzers.sharpe.get_analysis().get('sharperatio', None) #
        analysis['returns'] = strat_results.analyzers.returns.get_analysis().get('rtot', None) # Total compounded return #
        analysis['max_drawdown'] = strat_results.analyzers.drawdown.get_analysis().max.drawdown #
        analysis['trade_analysis'] = strat_results.analyzers.trades.get_analysis() #
        analysis['sqn'] = strat_results.analyzers.sqn.get_analysis().get('sqn', None) #

        logger.info(f"Annualized Sharpe Ratio:  {analysis['sharpe']:.3f}" if analysis['sharpe'] is not None else "Sharpe Ratio: N/A") #
        logger.info(f"Max Drawdown:             {analysis['max_drawdown']:.2f}%") #
        logger.info(f"System Quality Number:    {analysis['sqn']:.2f}" if analysis['sqn'] is not None else "SQN: N/A") #

        ta = analysis['trade_analysis'] #
        if ta and ta.total.total > 0: #
            logger.info(f"Total Closed Trades:      {ta.total.closed}") #
            win_rate = (ta.won.total / ta.total.closed * 100) if ta.total.closed > 0 else 0 #
            logger.info(f"Win Rate:                 {win_rate:.2f}%") #
            logger.info(f"Avg Winning PNL:          {ta.won.pnl.average:.2f}") #
            logger.info(f"Avg Losing PNL:           {ta.lost.pnl.average:.2f}") #
            profit_factor = abs(ta.won.pnl.total / ta.lost.pnl.total) if ta.lost.pnl.total != 0 else "inf" #
            logger.info(f"Profit Factor:            {profit_factor}") #
        else:
            logger.info("No trades were executed during the backtest.") #

    except KeyError as e: logger.error(f"KeyError accessing analysis results: {e}") #
    except Exception as e: logger.error(f"Error processing analysis results: {e}", exc_info=True) #

    # 7. 차트 그리기 (Colab에서는 파일 저장 권장)
    try:
        plot_file = os.path.join(cfg.LOG_DIR, f"{ticker}_backtest_plot.png") #
        # iplot=False 로 설정해야 non-interactive 환경에서 실행 가능
        # plotstyle='candlestick' 또는 'bar' 등
        cerebro.plot(style='candlestick', barup='green', bardown='red', volume=True, iplot=False, savefig=True, figfilename=plot_file, dpi=150) #
        logger.info(f"Backtest plot saved to {plot_file}") #
    except ImportError: logger.warning("matplotlib not found. Skipping plot generation.") #
    except Exception as e: logger.error(f"Error generating plot for {ticker}: {e}", exc_info=True) #

    logger.info(f"--- Backtest finished for {ticker} ---\n") #
    return final_value, analysis #

In [24]:
# main.py (셀 12)
import argparse #
import os #
from datetime import datetime #
import pandas as pd #
import logging # 로깅 임포트 #

# --- 필요한 클래스 및 함수 임포트 ---
# 이 셀들이 모두 실행된 상태이므로 별도 import 필요 없음

# --- 전역 설정 및 로거 객체 확인 ---
# cfg 와 logger 객체가 이전 셀에서 생성되어 있어야 함
if 'cfg' not in globals() or 'logger' not in globals():
     raise NameError("Config object 'cfg' or 'logger' is not defined. Please run previous cells.")

# --- 메인 파이프라인 함수 (dm_instance 인자 추가됨) ---
@timeit #
def run_main_pipeline(dm_instance, mode='backtest', tickers=None, start_date=None, end_date=None, train_steps=None): # <- dm_instance 추가
    """
    AI 트레이딩 시스템의 메인 파이프라인을 실행합니다.
    (DataManager 인스턴스를 인자로 받습니다)
    """
    logger.info(f"--- Starting Main Pipeline in '{mode}' mode ---") #
    set_random_seed(cfg.RANDOM_SEED) # 재현성 위한 시드 설정 #

    # --- 전달받은 dm_instance 유효성 검사 ---
    if dm_instance is None or not hasattr(dm_instance, 'conn') or dm_instance.conn is None:
        logger.error("A valid DataManager instance (dm_instance) must be provided. Exiting.")
        return
    dm = dm_instance # 함수 내에서 사용할 변수에 할당

    target_tickers = tickers if tickers else cfg.TARGET_TICKERS #
    if not target_tickers: #
        logger.error("No target tickers specified in config or arguments. Exiting.") #
        return #

    logger.info(f"Target Tickers: {target_tickers}") #

    try:
        # --- FeatureEngineer, RiskManager 인스턴스 생성 ---
        # (이들은 DataManager 와 달리 상태를 유지할 필요가 적어 함수 내 생성 가능)
        fe = FeatureEngineer() #
        rm = RiskManager() #

        # === 데이터 업데이트 모드 ===
        if mode == 'data': #
            logger.info("--- Running Data Update ---") #
            data_start = start_date if start_date else cfg.DATA_START_DATE #
            data_end = end_date if end_date else cfg.DATA_END_DATE #
            dm.update_stock_prices(tickers=target_tickers, start_date=data_start, end_date=data_end) # 전달받은 dm 사용
            kr_tickers_in_target = [t for t in target_tickers if len(t) == 6 and t.isdigit()] #
            if cfg.DART_API_KEY and cfg.DART_API_KEY != "YOUR_OPENDART_API_KEY" and kr_tickers_in_target: #
                funda_start_year = int(data_start[:4]) #
                funda_end_year = int(data_end[:4]) #
                logger.info(f"Updating financial data for KR tickers: {kr_tickers_in_target}") #
                dm.update_financials(tickers=kr_tickers_in_target, start_year=funda_start_year, end_year=funda_end_year) # 전달받은 dm 사용
            else:
                logger.warning("Skipping financial data update (No DART key/valid key or no KR tickers).") #

        # === 에이전트 학습 모드 ===
        elif mode == 'train': #
            logger.info("--- Running Agent Training ---") #
            total_timesteps = train_steps if train_steps else cfg.TOTAL_TRAINING_TIMESTEPS #
            if total_timesteps <= 0: logger.error("Total training timesteps must be positive."); return #

            for ticker in target_tickers: #
                logger.info(f"--- Training Agent for {ticker} ---") #
                try:
                    # 학습/평가 환경 팩토리 (전달받은 dm 사용)
                    train_env_factory = lambda: StockTradingEnv(data_manager=dm, feature_engineer=fe, risk_manager=rm, tickers=[ticker], start_date=cfg.TRAIN_START_DATE, end_date=cfg.TRAIN_END_DATE, is_training=True) # rm 추가
                    eval_env_factory = lambda: StockTradingEnv(data_manager=dm, feature_engineer=fe, risk_manager=rm, tickers=[ticker], start_date=cfg.VALIDATION_START_DATE, end_date=cfg.VALIDATION_END_DATE, is_training=False) # rm 추가
                    agent = RLAgent(ticker=ticker, env_factory=train_env_factory) # RLAgent 인스턴스 생성
                    agent.train(total_timesteps=total_timesteps, eval_env_factory=eval_env_factory) # 학습 시작
                    logger.info(f"--- Finished Training for {ticker} ---") #
                except ValueError as ve: logger.error(f"Value error training {ticker}: {ve}. Skip."); continue #
                except Exception as e: logger.error(f"Error training {ticker}: {e}", exc_info=True); continue #

        # === 백테스팅 모드 ===
        elif mode == 'backtest': #
            logger.info("--- Running Backtesting ---") #
            backtest_results = {}; portfolio_values = {} #
            backtest_start = start_date if start_date else cfg.TEST_START_DATE #
            backtest_end = end_date if end_date else cfg.TEST_END_DATE #
            logger.info(f"Backtesting Period: {backtest_start} to {backtest_end}") #

            for ticker in target_tickers: #
                logger.info(f"--- Backtesting Agent for {ticker} ---") #
                try:
                    # 에이전트 로드 시 환경 팩토리 (전달받은 dm 사용)
                    load_env_factory = lambda: StockTradingEnv(data_manager=dm, feature_engineer=fe, risk_manager=rm, tickers=[ticker], start_date=backtest_start, end_date=backtest_end, is_training=False) # rm 추가
                    agent = RLAgent(ticker=ticker, env_factory=load_env_factory) # 모델 로드 시도
                    if agent.model is None: logger.warning(f"No trained model for {ticker}. Skip backtest."); continue #
                    # run_backtest 함수 호출 (전달받은 dm 사용)
                    final_value, analysis = run_backtest(ticker, dm, fe, agent, rm) #
                    if final_value is not None: backtest_results[ticker] = analysis; portfolio_values[ticker] = final_value #
                    else: logger.warning(f"Backtest returned None for {ticker}.") #
                except ValueError as ve: logger.error(f"Value error backtesting {ticker}: {ve}. Skip."); continue #
                except Exception as e: logger.error(f"Error backtesting {ticker}: {e}", exc_info=True); continue #

            # --- 전체 백테스트 결과 요약 ---
            if backtest_results: #
                logger.info("--- Overall Backtest Summary ---") #
                total_final_value = sum(portfolio_values.values()); num_backtested = len(portfolio_values) #
                total_initial_value = cfg.BACKTEST_INITIAL_CASH * num_backtested if num_backtested > 0 else 0.0 #
                if total_initial_value > 0: #
                    overall_return = (total_final_value / total_initial_value - 1) #
                    logger.info(f"Tickers Backtested: {num_backtested}"); logger.info(f"Total Initial: {total_initial_value:,.2f}") #
                    logger.info(f"Total Final:   {total_final_value:,.2f}"); logger.info(f"Overall Return: {overall_return:.2%}") #
                else: logger.info("Could not calculate overall return.") #
                logger.info("--- Detailed Ticker Results ---") #
                results_df = pd.DataFrame(backtest_results).T; results_df['Final Value'] = pd.Series(portfolio_values) #
                initial_cash = cfg.BACKTEST_INITIAL_CASH #
                results_df['Return (%)'] = ((results_df['Final Value'] / initial_cash - 1) * 100).round(2) if initial_cash > 0 else 0.0 #
                summary_cols = ['Final Value', 'Return (%)', 'sharpe', 'max_drawdown', 'sqn'] #
                valid_cols = [col for col in summary_cols if col in results_df.columns] #
                print(results_df[valid_cols].to_string(float_format='{:,.2f}'.format)) #
            else: logger.info("No backtest results to summarize.") #

        # === 잘못된 모드 ===
        else:
            logger.error(f"Invalid mode: '{mode}'. Use 'data', 'train', or 'backtest'.") #

    except Exception as e:
        logger.critical(f"Critical error in main pipeline: {e}", exc_info=True) #
    finally:
        # --- 자원 정리 (DB 연결은 유지) ---
        # dm 객체 자체는 인자로 받았으므로 여기서 close 호출 X (호출 측에서 관리)
        # if dm and hasattr(dm, 'close_connection'):
        #     # dm.close_connection() # 필요 시 호출 측에서
        logger.info(f"--- Main Pipeline Finished ('{mode}' mode) ---\n") #

# ==============================================================
# --- !!! 함수 호출 방식 변경 필요 !!! ---
# ==============================================================
# 아래 예시들은 수정된 함수 정의에 맞게 변경해야 합니다.
# 'dm' 객체를 첫 번째 인자로 전달해야 합니다.

# --- 스크립트 실행 진입점 (Colab에서는 직접 함수 호출) ---
# 아래 코드는 Colab 환경에서는 직접 실행하기보다는,
# run_main_pipeline 함수를 필요한 인자와 함께 직접 호출하는 것이 더 편리합니다.

# 예시: 데이터 업데이트 실행 (셀 6 실행으로 대체 가능)
# >> 수정된 호출 방식: run_main_pipeline(dm, mode='data')

# 예시: AAPL 종목 학습 실행 (10000 타임스텝)
run_main_pipeline(dm, mode='train', tickers=['AAPL'], train_steps=10000)

# 예시: 전체 설정된 종목 백테스트 실행
# >> 수정된 호출 방식: run_main_pipeline(dm, mode='backtest')

2025-04-29 07:28:18,585 - AI_Trading_System_Free - INFO - --- Starting Main Pipeline in 'train' mode ---
INFO:AI_Trading_System_Free:--- Starting Main Pipeline in 'train' mode ---
2025-04-29 07:28:18,592 - AI_Trading_System_Free - INFO - Global random seeds set to: 42
INFO:AI_Trading_System_Free:Global random seeds set to: 42
2025-04-29 07:28:18,595 - AI_Trading_System_Free - INFO - Target Tickers: ['AAPL']
INFO:AI_Trading_System_Free:Target Tickers: ['AAPL']
2025-04-29 07:28:18,597 - AI_Trading_System_Free - INFO - FeatureEngineer initialized with settings: Tech=True, Funda=True, Sent=False
INFO:AI_Trading_System_Free:FeatureEngineer initialized with settings: Tech=True, Funda=True, Sent=False
2025-04-29 07:28:18,599 - AI_Trading_System_Free - INFO - Using technical indicators: ['MA5', 'MA10', 'MA20', 'MA60', 'RSI14', 'MACD', 'MACD_signal', 'MACD_hist', 'BB_upper', 'BB_middle', 'BB_lower', 'ATR', 'STOCH_K', 'STOCH_D', 'CCI', 'ADX', 'OBV']
INFO:AI_Trading_System_Free:Using technical in

Using cpu device


In [25]:
run_main_pipeline(dm, mode='backtest')

2025-04-29 07:36:33,218 - AI_Trading_System_Free - INFO - --- Starting Main Pipeline in 'backtest' mode ---
INFO:AI_Trading_System_Free:--- Starting Main Pipeline in 'backtest' mode ---
2025-04-29 07:36:33,224 - AI_Trading_System_Free - INFO - Global random seeds set to: 42
INFO:AI_Trading_System_Free:Global random seeds set to: 42
2025-04-29 07:36:33,226 - AI_Trading_System_Free - INFO - Target Tickers: ['AAPL', 'MSFT', 'NVDA', 'GOOGL', 'AMZN', '005930', '000660', '035720', '051910', '005380']
INFO:AI_Trading_System_Free:Target Tickers: ['AAPL', 'MSFT', 'NVDA', 'GOOGL', 'AMZN', '005930', '000660', '035720', '051910', '005380']
2025-04-29 07:36:33,228 - AI_Trading_System_Free - INFO - FeatureEngineer initialized with settings: Tech=True, Funda=True, Sent=False
INFO:AI_Trading_System_Free:FeatureEngineer initialized with settings: Tech=True, Funda=True, Sent=False
2025-04-29 07:36:33,230 - AI_Trading_System_Free - INFO - Using technical indicators: ['MA5', 'MA10', 'MA20', 'MA60', 'RSI14