<a href="https://colab.research.google.com/github/csieung/ml/blob/main/stock_ml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install boto3

Collecting botocore<1.40.0,>=1.39.9 (from boto3)
  Using cached botocore-1.39.9-py3-none-any.whl.metadata (5.7 kB)
Using cached botocore-1.39.9-py3-none-any.whl (13.9 MB)
Installing collected packages: botocore
  Attempting uninstall: botocore
    Found existing installation: botocore 1.38.46
    Uninstalling botocore-1.38.46:
      Successfully uninstalled botocore-1.38.46
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
aiobotocore 2.23.1 requires botocore<1.38.47,>=1.38.40, but you have botocore 1.39.9 which is incompatible.[0m[31m
[0mSuccessfully installed botocore-1.39.9


# 1. 전처리 -> s3 저장(feature table)

In [None]:
import pandas as pd
import numpy as np
import boto3
from datetime import datetime, timedelta
import io
from google.colab import userdata


def load_data(target_date: str) -> pd.DataFrame:
    aws_access_key_id = userdata.get('AWS_ACCESS_KEY_ID')
    aws_secret_access_key = userdata.get('AWS_SECRET_ACCESS_KEY')

    s3_client = boto3.client(
        's3',
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        region_name = 'us-east-1'
    )

    bucket_name = 'de6-team7-bucket'
    prefix = f"raw_stock/stock_dt={target_date}/"
    print(f"S3 경로에서 데이터를 불러옵니다: s3://{bucket_name}/{prefix}")

    try:
        response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
        if 'Contents' not in response:
            print(f"오류: 경로 '{prefix}'에 파일이 없습니다.")
            return pd.DataFrame()
    except Exception as e:
        print(f"S3 파일 목록을 가져오는 중 오류 발생: {e}")
        return pd.DataFrame()
    file_keys = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.parquet')]
    if not file_keys:
        print(f"경로 '{prefix}'에서 Parquet 파일을 찾지 못했습니다.")
        return pd.DataFrame()
    print(f"총 {len(file_keys)}개의 Parquet 파일을 발견했습니다.")

    df_list = []
    for key in file_keys:
        s3_object = s3_client.get_object(Bucket=bucket_name, Key=key)
        buffer = io.BytesIO(s3_object['Body'].read())
        df_temp = pd.read_parquet(buffer)
        df_list.append(df_temp)

    if not df_list:
        print('데이터를 불러오지 못했습니다.')
        return pd.DataFrame()

    raw_combined = pd.concat(df_list, ignore_index=True)
    print(f"총 {len(raw_combined):,}개의 로우(데이터)를 성공적으로 불러왔습니다.")
    return raw_combined

def feature_table(raw_df: pd.DataFrame, target_date: str) -> pd.DataFrame:
    """
    주식 원시 데이터를 1분 단위로 집계합니다. (후처리 로직을 단순화하여 안정성을 확보한 최종 버전)
    """
    df = raw_df.copy()
    df['timestamp'] = pd.to_datetime(df['timestamp_ms'], unit='s', utc=True) \
                      .dt.tz_convert('Asia/Seoul') \
                      .dt.tz_localize(None)
    df = df.sort_values('timestamp')

    required_cols = ['timestamp_ms', 'stock_code', 'stock_name', 'trade_price', 'antc_volume', 'antc_amount',
                     'ask_price_1', 'bid_price_1', 'bid_volume_1', 'ask_volume_1',
                     'total_bid_volume', 'total_ask_volume']
    if not all(col in df.columns for col in required_cols):
        missing = [col for col in required_cols if col not in df.columns]
        print(f"🚨 오류: 필수 컬럼이 누락되었습니다 - {missing}")
        return pd.DataFrame()

    df['spread'] = df['ask_price_1'] - df['bid_price_1']
    df['orderbook_imbalance'] = df['bid_volume_1'] / (df['bid_volume_1'] + df['ask_volume_1']).replace(0, np.nan)
    df['total_book_imbalance'] = df['total_bid_volume'] / (df['total_bid_volume'] + df['total_ask_volume']).replace(0, np.nan)

    processed_groups = []

    # stock_code 별로 그룹을 나누어 반복 처리
    for code, group_df in df.groupby('stock_code'):
        resampler = group_df.set_index('timestamp').resample('1h')

        ohlc_df = resampler['trade_price'].ohlc()
        volume_df = resampler['antc_volume'].sum().to_frame('volume_1h')
        amount_df = resampler['antc_amount'].sum().to_frame('amount_1h')
        spread_df = resampler['spread'].mean().to_frame('spread_mean_1h')
        imbalance_df = resampler['orderbook_imbalance'].mean().to_frame('orderbook_imbalance_1h')
        total_imbalance_df = resampler['total_book_imbalance'].mean().to_frame('total_book_imbalance_1h')

        minute_df = pd.concat([
            ohlc_df, volume_df, amount_df, spread_df, imbalance_df, total_imbalance_df
        ], axis=1)

        minute_df.rename(columns={
            'open': 'open_price_1h', 'high': 'high_price_1h',
            'low': 'low_price_1h', 'close': 'close_price_1h'
        }, inplace=True)

        minute_df['stock_name'] = group_df['stock_name'].iloc[0] if not group_df.empty else ''
        minute_df['stock_code'] = code

        processed_groups.append(minute_df)

    if not processed_groups:
        print("처리할 데이터가 없습니다.")
        return pd.DataFrame()

    final_df = pd.concat(processed_groups).reset_index(drop=False)

    cols_to_ffill = final_df.columns.difference(['timestamp', 'stock_code', 'stock_name'])
    final_df[cols_to_ffill] = final_df.groupby('stock_code')[cols_to_ffill].ffill()

    final_df['vwap_1h'] = final_df['amount_1h'] / final_df['volume_1h'].replace(0, np.nan)

    grp = final_df.groupby('stock_code')

    final_df['ma_5h'] = grp['close_price_1h'].rolling(5, min_periods=1).mean().reset_index(level=0, drop=True)
    final_df['ma_20h'] = grp['close_price_1h'].rolling(20, min_periods=1).mean().reset_index(level=0, drop=True)
    rolling_std_20 = grp['close_price_1h'].rolling(20, min_periods=1).std().reset_index(level=0, drop=True)
    #final_df['boll_upper'] = final_df['ma_20m'] + 2 * rolling_std_20
    #final_df['boll_lower'] = final_df['ma_20m'] - 2 * rolling_std_20


    final_df.dropna(subset=['close_price_1h'], inplace=True)

    final_cols = [
        'timestamp', 'stock_code', 'stock_name', 'open_price_1h', 'high_price_1h',
        'low_price_1h', 'close_price_1h', 'volume_1h', 'vwap_1h', 'spread_mean_1h',
        'orderbook_imbalance_1h', 'total_book_imbalance_1h', 'ma_5h', 'ma_20h'
    ]

    existing_cols = [col for col in final_cols if col in final_df.columns]
    final_df = final_df[existing_cols].copy()

    final_df = final_df.sort_values(['stock_code', 'timestamp']).reset_index(drop=True)

    return final_df

def save_to_s3(df: pd.DataFrame, target_date:str):
    if df.empty:
        print("저장된 데이터가 없습니다.")
        return

    bucket_name = 'de6-team7-bucket'
    output_path = f"s3://{bucket_name}/processed_stock/stock_dt={target_date}/features.parquet"

    print(f"\n전처리된 피처 테이블을 S3에 저장합니다...")
    print(f"경로: {output_path}")

    try:
        storage_options = {
            'key': userdata.get('AWS_ACCESS_KEY_ID'),
            'secret': userdata.get('AWS_SECRET_ACCESS_KEY')
        }
        df.to_parquet(
            output_path,
            index=False,
            engine='pyarrow',
            storage_options=storage_options
        )
        print("✅ 저장 완료!")
    except Exception as e:
        print(f"🚨 S3 저장 중 오류 발생: {e}")

if __name__ == "__main__":
    target_date = '2025-07-11'
    print(f"--- 주식 데이터 처리 시작 ({target_date}) ---")

    raw_data = load_data(target_date)

    if not raw_data.empty:
        feature_df = feature_table(raw_data, target_date)
        print(feature_df.head())
        save_to_s3(feature_df, target_date)
    else:
        print(f"{target_date}에 처리할 데이터가 없습니다.")

    print(f"--- 주식 데이터 처리 완료 ({target_date}) ---")


--- 주식 데이터 처리 시작 (2025-07-11) ---
S3 경로에서 데이터를 불러옵니다: s3://de6-team7-bucket/raw_stock/stock_dt=2025-07-11/
총 20개의 Parquet 파일을 발견했습니다.
총 13,327개의 로우(데이터)를 성공적으로 불러왔습니다.
            timestamp stock_code stock_name  open_price_1h  high_price_1h  \
0 2025-07-11 10:00:00     000020       동화약품         6950.0         6950.0   
1 2025-07-11 11:00:00     000020       동화약품         6910.0         6910.0   
2 2025-07-11 12:00:00     000020       동화약품         6930.0         6940.0   
3 2025-07-11 13:00:00     000020       동화약품         6910.0         6910.0   
4 2025-07-11 14:00:00     000020       동화약품         6910.0         6910.0   

   low_price_1h  close_price_1h  volume_1h  vwap_1h  spread_mean_1h  \
0        6940.0          6940.0       1836   6890.0            10.0   
1        6910.0          6910.0        612   6890.0            20.0   
2        6930.0          6940.0       1224   6890.0            15.0   
3        6910.0          6910.0        612   6890.0            20.0   
4        6910.

In [None]:
!pip install s3fs



In [None]:
!pip install snowflake-connector-python[pandas]

Collecting snowflake-connector-python[pandas]
  Downloading snowflake_connector_python-3.16.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (71 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.8/71.8 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python[pandas])
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Downloading asn1crypto-1.5.1-py2.py3-none-any.whl (105 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m105.0/105.0 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading snowflake_connector_python-3.16.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m44.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: asn1crypto, snowflake-connector-python
Successfully installed asn1crypto-1.5.1 snowflake-connector-pytho

# 2. 1시간 뒤 가격 예측(회귀)


In [None]:
import lightgbm as lgb
from sklearn.metrics import mean_absolute_error
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from concurrent.futures import ThreadPoolExecutor


def get_snowflake_connection():
    return snowflake.connector.connect(
        user=userdata.get('SNOWFLAKE_USER'),
        password=userdata.get('SNOWFLAKE_PASSWORD'),
        account=userdata.get('SNOWFLAKE_ACCOUNT'),
        warehouse=userdata.get('SNOWFLAKE_WAREHOUSE'),
        database=userdata.get('SNOWFLAKE_DATABASE'),
        schema=userdata.get('SNOWFLAKE_SCHEMA')
    )

# --- 마켓별 가격 예측 모델 학습 (회귀) ---
def train_stock_regression_model(market_df):
    df = market_df.copy()

    # Target 변수 생성 ('1시간 뒤 종가')
    df['future_1h_price'] = df['close_price_1h'].shift(-1)
    df.dropna(subset=['future_1h_price'], inplace=True)
    if df.empty:
        return None, None


    cols_to_drop = df.columns[df.isna().all()].tolist()
    feature_cols = df.columns.difference([
        'stock_code', 'stock_name', 'timestamp', 'future_1h_price'
    ] + cols_to_drop)

    X, y = df[feature_cols], df['future_1h_price']

    split_idx = int(len(df) * 0.8)
    X_train, X_test = X.iloc[:split_idx].copy(), X.iloc[split_idx:].copy()
    y_train, y_test = y.iloc[:split_idx].copy(), y.iloc[split_idx:].copy()

    model = lgb.LGBMRegressor(objective='regression_l1', random_state=42)
    model.fit(X_train, y_train,
              eval_set=[(X_test, y_test)],
              callbacks=[lgb.early_stopping(stopping_rounds=50, verbose=False)])

    predictions = model.predict(X_test)
    mae = mean_absolute_error(y_test, predictions)

    result_df = pd.DataFrame({
        'timestamp': df.loc[y_test.index, 'timestamp'],
        'stock_code': df.loc[y_test.index, 'stock_code'],
        'stock_name': df.loc[y_test.index, 'stock_name'],
        'actual_price': y_test,
        'predicted_price': predictions
    })

    return mae, result_df

def train_and_predict_regression(df):
    """병렬 처리를 통해 모든 종목의 회귀 모델을 학습하고 예측 결과를 종합합니다."""
    print("\n--- ML 가격 예측(회귀) 모델 학습 시작 ---")
    if df.empty:
        print("학습할 데이터가 없습니다.")
        return pd.DataFrame()

    all_stocks = [group.copy() for _, group in df.groupby('stock_code')]
    results = []
    market_maes = {}

    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = {executor.submit(train_stock_regression_model, stock_df): stock_df['stock_code'].iloc[0] for stock_df in all_stocks}
        for future in futures:
            stock_code = futures[future]
            try:
                mae, res_df = future.result()
                if res_df is not None:
                    results.append(res_df)
                    market_maes[stock_code] = mae
            except Exception as exc:
                print(f'{stock_code} 모델 학습 중 오류 발생: {exc}')

    if market_maes:
        print("\n--- 상위 5개 모델 성능 (MAE가 낮은 순) ---")
        sorted_maes = sorted(market_maes.items(), key=lambda item: item[1])
        for code, mae in sorted_maes[:5]:
            print(f"종목 코드: {code}, MAE: {mae:.4f}")
        print("------------------------------------")

    if not results:
        print("예측 데이터가 없습니다.")
        return pd.DataFrame()

    final_df = pd.concat(results, ignore_index=True)
    print("✅ 학습 및 예측 완료")
    return final_df

def upload_to_snowflake(df, table_name, target_date):
    if df.empty:
        print("업로드할 데이터가 없습니다.")
        return

    staging_table_name = f"{table_name}_STAGING"
    df['processed_dt'] = pd.to_datetime(target_date).strftime('%Y-%m-%d')
    df['timestamp'] = pd.to_datetime(df['timestamp']).dt.strftime('%Y-%m-%d %H:%M:%S')

    conn = None
    try:
        print(f"\n--- Snowflake에 결과 적재 시작 ---")
        conn = get_snowflake_connection()
        if conn:
            write_pandas(conn, df, staging_table_name.upper(), auto_create_table=True, overwrite=True)
            print(f"✅ Staging 테이블({staging_table_name}) 적재 성공: {df.shape[0]} 행")

            cursor = conn.cursor()

            delete_query = f"DELETE FROM {table_name.upper()} WHERE PROCESSED_DT = '{target_date}'"
            cursor.execute(delete_query)
            print(f"✅ 본 테이블({table_name})에서 기존 데이터 삭제 완료: {cursor.rowcount} 행")

            insert_query = f"INSERT INTO {table_name.upper()} SELECT * FROM {staging_table_name.upper()}"
            cursor.execute(insert_query)
            print("✅ 본 테이블로 데이터 이동 완료!")

            cursor.close()
    except Exception as e:
        print(f"🚨 Snowflake 적재 오류: {e}")
    finally:
        if conn is not None:
            conn.close()

if __name__ == "__main__":
    if 'feature_df' in locals() and not feature_df.empty:
        predictions_df = train_and_predict_regression(feature_df)

        if not predictions_df.empty:
            print("\n--- 최종 예측 결과 (상위 5개) ---")
            print(predictions_df.head())
            print("--------------------------------")
            upload_to_snowflake(predictions_df, 'STOCK_PRICE_PREDICTION', target_date)
    else:
        print("오류: 'feature_df'가 준비되지 않았습니다. 이전 셀을 먼저 실행해주세요.")


--- ML 가격 예측(회귀) 모델 학습 시작 ---

--- 상위 5개 모델 성능 (MAE가 낮은 순) ---
종목 코드: 000020, MAE: 0.0000
종목 코드: 000300, MAE: 0.0000
종목 코드: 000430, MAE: 0.0000
종목 코드: 000520, MAE: 0.0000
종목 코드: 000650, MAE: 0.0000
------------------------------------
✅ 학습 및 예측 완료

--- 최종 예측 결과 (상위 5개) ---
            timestamp stock_code stock_name  actual_price  predicted_price
0 2025-07-11 13:00:00     000020       동화약품        6910.0           6910.0
1 2025-07-11 13:00:00     000040      KR모터스         430.0            431.0
2 2025-07-11 13:00:00     000050         경방        8150.0           8190.0
3 2025-07-11 13:00:00     000070      삼양홀딩스      100100.0          99500.0
4 2025-07-11 13:00:00     000080      하이트진로       21600.0          21675.0
--------------------------------

--- Snowflake에 결과 적재 시작 ---
✅ Staging 테이블(STOCK_PRICE_PREDICTION_STAGING) 적재 성공: 2770 행
✅ 본 테이블(STOCK_PRICE_PREDICTION)에서 기존 데이터 삭제 완료: 25693 행
✅ 본 테이블로 데이터 이동 완료!


In [None]:
print("forkin"+"me")

forkinme
