In [1]:
import yfinance as yf
import pandas as pd
import os
from typing import List, Dict, Optional, Tuple, Union
from datetime import datetime, timedelta

class MovingAverageCalculator:
    """이동평균 및 기울기 계산을 위한 클래스"""
    
    @staticmethod
    def calculate_ma(data: pd.Series, window: int) -> pd.Series:
        """
        이동 평균을 계산
        
        Args:
            data: 계산할 시계열 데이터
            window: 이동평균 기간
            
        Returns:
            이동평균 시리즈
        """
        return data.rolling(window=window, min_periods=1).mean()
    
    @staticmethod
    def calculate_slope(data: pd.Series) -> pd.Series:
        """
        이동 평균선의 기울기(전일 대비 변화량)를 계산
        
        Args:
            data: 이동평균 시리즈
            
        Returns:
            기울기 시리즈
        """
        return data.diff() / 2  # (현재값 - 이전값) / 2


class StockDataProcessor:
    """주식 데이터 처리 및 저장을 위한 클래스"""
    
    def __init__(self, project_dir: str = 'csv'):
        """
        초기화
        
        Args:
            project_dir: 데이터 저장 디렉토리
        """
        self.project_dir = project_dir
        os.makedirs(project_dir, exist_ok=True)
        self.ma_calculator = MovingAverageCalculator()
        
    def get_stock_data(self, 
                       ticker: str, 
                       start_date: str, 
                       end_date: str, 
                       interval: str = '1d',
                       ma_periods: Optional[List[int]] = None) -> pd.DataFrame:
        """
        주식 데이터를 다운로드하고 이동평균 및 기울기 계산
        
        Args:
            ticker: 주식 티커 심볼
            start_date: 시작일
            end_date: 종료일
            interval: 데이터 간격 ('1d':일봉, '1wk':주봉, '1mo':월봉)
            ma_periods: 이동평균 기간 리스트
            
        Returns:
            처리된 주식 데이터 DataFrame
        """
        # 기본 이동평균 기간 설정
        if ma_periods is None:
            if interval == '1d':
                # 일봉 데이터의 경우 5일부터 700일까지
                ma_periods = [5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 60] + list(range(70, 710, 10))
            elif interval == '1wk':
                # 주봉 데이터의 경우 5주부터 365주까지
                ma_periods = list(range(5, 366, 5))
            elif interval == '1mo':
                # 월봉 데이터의 경우 5개월부터 365개월까지
                ma_periods = list(range(5, 366, 5))
        
        # 간격에 따른 접두사 설정 (칼럼명 구분을 위해)
        prefix = {
            '1d': 'D',   # 일봉
            '1wk': 'W',  # 주봉
            '1mo': 'M'   # 월봉
        }.get(interval, '')
        
        # 데이터 다운로드
        data = yf.download(ticker, start=start_date, end=end_date, interval=interval)
        
        # 빈 데이터 확인
        if data.empty:
            print(f"데이터가 없습니다. 티커: {ticker}, 기간: {start_date} ~ {end_date}, 간격: {interval}")
            return pd.DataFrame()
        
        # 필요한 컬럼만 선택하고 인덱스 재설정
        data = data[['Close', 'Volume']].reset_index()
        data.columns = ['Date', f'{prefix}_Close', f'{prefix}_Volume']
        
        # 이동평균 및 기울기 계산
        ma_columns = {}
        slope_columns = {}
        
        for period in ma_periods:
            # 가격 이동평균 (간격 접두사 포함)
            ma_price = self.ma_calculator.calculate_ma(data[f'{prefix}_Close'], period)
            ma_columns[f'{prefix}_SMA_{period}'] = ma_price
            slope_columns[f'{prefix}_Slope_SMA_{period}'] = self.ma_calculator.calculate_slope(ma_price)
            
            # 거래량 이동평균 (간격 접두사 포함)
            ma_volume = self.ma_calculator.calculate_ma(data[f'{prefix}_Volume'], period)
            ma_columns[f'{prefix}_VMA_{period}'] = ma_volume
            slope_columns[f'{prefix}_Slope_VMA_{period}'] = self.ma_calculator.calculate_slope(ma_volume)
        
        # 데이터프레임 생성 및 병합
        ma_df = pd.DataFrame(ma_columns)
        slope_df = pd.DataFrame(slope_columns)
        result_df = pd.concat([data[['Date']], data[[f'{prefix}_Close', f'{prefix}_Volume']], ma_df, slope_df], axis=1)
        
        # NaN 값 제거
        result_df.dropna(inplace=True)
        
        return result_df
    
    def merge_interval_data(self, 
                           daily_data: pd.DataFrame, 
                           weekly_data: pd.DataFrame, 
                           monthly_data: pd.DataFrame) -> pd.DataFrame:
        """
        일봉, 주봉, 월봉 데이터를 하나로 병합
        
        Args:
            daily_data: 일봉 데이터
            weekly_data: 주봉 데이터
            monthly_data: 월봉 데이터
            
        Returns:
            병합된 데이터프레임
        """
        if daily_data.empty:
            print("일봉 데이터가 없습니다. 병합을 진행할 수 없습니다.")
            return pd.DataFrame()
            
        # 일봉 데이터의 날짜를 기준으로 데이터 병합 준비
        result = daily_data.copy()
        
        # 주봉 데이터 병합
        if not weekly_data.empty:
            # 날짜 컬럼을 인덱스로 설정
            weekly_indexed = weekly_data.set_index('Date')
            
            # 리샘플링하여 일별 데이터에 맞춤 (forward fill 사용)
            weekly_resampled = weekly_indexed.resample('D').ffill()
            
            # 병합할 데이터프레임으로 변환
            weekly_to_merge = weekly_resampled.reset_index()
            
            # 일봉 데이터와 병합
            result = pd.merge(result, weekly_to_merge, on='Date', how='left')
        
        # 월봉 데이터 병합
        if not monthly_data.empty:
            # 날짜 컬럼을 인덱스로 설정
            monthly_indexed = monthly_data.set_index('Date')
            
            # 리샘플링하여 일별 데이터에 맞춤 (forward fill 사용)
            monthly_resampled = monthly_indexed.resample('D').ffill()
            
            # 병합할 데이터프레임으로 변환
            monthly_to_merge = monthly_resampled.reset_index()
            
            # 기존 데이터와 병합
            result = pd.merge(result, monthly_to_merge, on='Date', how='left')
        
        # 병합된 데이터에서 NaN 값 처리
        result.ffill(inplace=True)
        
        return result
    
    def save_data(self, data: pd.DataFrame, filename: str) -> None:
        """
        데이터를 CSV 파일로 저장
        
        Args:
            data: 저장할 데이터프레임
            filename: 저장할 파일명
        """
        if data.empty:
            print(f"저장할 데이터가 없습니다: {filename}")
            return
            
        full_path = os.path.join(self.project_dir, filename)
        data.to_csv(full_path, index=False)
        print(f"데이터가 저장되었습니다: {full_path} (행 수: {len(data)})")


class StockDataManager:
    """주식 데이터 관리 클래스"""
    
    def __init__(self, project_dir: str = 'csv'):
        """
        초기화
        
        Args:
            project_dir: 데이터 저장 디렉토리
        """
        self.processor = StockDataProcessor(project_dir)
    
    def process_ticker(self, 
                     ticker: str, 
                     start_date: str, 
                     end_date: str,
                     train_test_split_date: Optional[str] = None) -> None:
        """
        특정 티커의 일/주/월 데이터를 통합 파일로 처리하고 저장
        
        Args:
            ticker: 주식 티커 심볼
            start_date: 시작일
            end_date: 종료일
            train_test_split_date: 학습/테스트 데이터 분리 날짜 (None이면 분리하지 않음)
        """
        ticker_name = ticker.replace('^', '')  # 특수문자 제거
        
        if train_test_split_date:
            # 학습 데이터 - 각 간격별 데이터 가져오기
            daily_train = self.processor.get_stock_data(ticker, start_date, train_test_split_date, interval='1d')
            weekly_train = self.processor.get_stock_data(ticker, start_date, train_test_split_date, interval='1wk')
            monthly_train = self.processor.get_stock_data(ticker, start_date, train_test_split_date, interval='1mo')
            
            # 데이터 병합
            combined_train = self.processor.merge_interval_data(daily_train, weekly_train, monthly_train)
            
            # 병합된 데이터 저장
            if not combined_train.empty:
                self.processor.save_data(combined_train, f"{ticker_name}_combined_train_data.csv")
            
            # 테스트 데이터 - 각 간격별 데이터 가져오기
            daily_test = self.processor.get_stock_data(ticker, train_test_split_date, end_date, interval='1d')
            weekly_test = self.processor.get_stock_data(ticker, train_test_split_date, end_date, interval='1wk')
            monthly_test = self.processor.get_stock_data(ticker, train_test_split_date, end_date, interval='1mo')
            
            # 데이터 병합
            combined_test = self.processor.merge_interval_data(daily_test, weekly_test, monthly_test)
            
            # 병합된 데이터 저장
            if not combined_test.empty:
                self.processor.save_data(combined_test, f"{ticker_name}_combined_test_data.csv")
        else:
            # 전체 데이터 - 각 간격별 데이터 가져오기
            daily_data = self.processor.get_stock_data(ticker, start_date, end_date, interval='1d')
            weekly_data = self.processor.get_stock_data(ticker, start_date, end_date, interval='1wk')
            monthly_data = self.processor.get_stock_data(ticker, start_date, end_date, interval='1mo')
            
            # 데이터 병합
            combined_data = self.processor.merge_interval_data(daily_data, weekly_data, monthly_data)
            
            # 병합된 데이터 저장
            if not combined_data.empty:
                self.processor.save_data(combined_data, f"{ticker_name}_combined_data.csv")
    
    def process_multiple_tickers(self, 
                                tickers: List[str], 
                                start_date: str, 
                                end_date: str,
                                train_test_split_date: Optional[str] = None) -> None:
        """
        여러 티커에 대해 데이터를 처리
        
        Args:
            tickers: 주식 티커 심볼 리스트
            start_date: 시작일
            end_date: 종료일
            train_test_split_date: 학습/테스트 데이터 분리 날짜 (None이면 분리하지 않음)
        """
        for ticker in tickers:
            print(f"\n처리 중: {ticker}")
            self.process_ticker(ticker, start_date, end_date, train_test_split_date)


def main():
    """메인 함수"""
    # 설정
    project_dir = 'csv'
    start_date = '2005-01-01'
    train_test_split_date = '2023-03-01'
    end_date = '2025-03-18'
    
    # 관리자 인스턴스 생성
    manager = StockDataManager(project_dir)
    
    # S&P 500 데이터 처리
    print("S&P 500 데이터 처리 중...")
    manager.process_ticker(
        '^GSPC', start_date, end_date, train_test_split_date
    )
    
    # # 추가 주식 처리
    # additional_tickers = ['AAPL', 'TSLA', 'GOOGL', 'MSFT', 'AMZN']
    # print(f"\n추가 티커 {len(additional_tickers)}개 처리 중...")
    # manager.process_multiple_tickers(
    #     additional_tickers, start_date, end_date, train_test_split_date
    # )
    
    print("\n모든 데이터 처리 완료!")


if __name__ == "__main__":
    main()

S&P 500 데이터 처리 중...
YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


데이터가 저장되었습니다: csv\GSPC_combined_train_data.csv (행 수: 4569)


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


데이터가 저장되었습니다: csv\GSPC_combined_test_data.csv (행 수: 512)

모든 데이터 처리 완료!
