In [4]:
!pip install s3fs
!pip install numpy
!pip install pandas



In [5]:
import os
import math
from typing import List

import numpy as np
import pandas as pd

print("pandas version:", pd.__version__)

# s3 기본 설정
BUCKET = "sagemaker-us-west-2-327784329358"
S3_BASE = f"s3://{BUCKET}"

# 입력 데이터: S3에서 바로 읽기
PRICE_OHLCV_PATH = f"{S3_BASE}/prepared_data/price_ohlcv.csv"
VQ_CODE_CSV_PATH = f"{S3_BASE}/vq_vae_outputs/vq_codes_spy.csv"
NEWS_CSV_PATH = f"{S3_BASE}/prepared_data/sp500_headlines_2008_2024.csv"

# 출력 JSONL: 로컬에 만들고 S3로 업로드
OUTPUT_JSONL_LOCAL_PATH = "/tmp/market_commentary_train.jsonl"
OUTPUT_JSONL_S3_KEY = "prepared_data/market_commentary_train.jsonl"


pandas version: 2.3.3


In [6]:
df_price = pd.read_csv(PRICE_OHLCV_PATH)

required_cols = {"date", "close"}
if not required_cols.issubset(df_price.columns):
    raise ValueError("prices_ohlcv.csv에 date, close 컬럼이 필요합니다.")

df_price["date"] = pd.to_datetime(df_price["date"])
df_price = df_price.sort_values("date").reset_index(drop=True)

# 여기서 수익률, 변동성 등 파생 피처 생성
df_price["ret_1d"] = df_price["close"].pct_change()
df_price["log_ret"] = np.log(df_price["close"]).diff()
df_price["close_ma_5"] = df_price["close"].rolling(window=5).mean()
df_price["close_ma_20"] = df_price["close"].rolling(window=20).mean()
df_price["vol_5"] = df_price["log_ret"].rolling(window=5).std()
df_price["vol_20"] = df_price["log_ret"].rolling(window=20).std()

# NaN 제거
df_price = df_price.dropna().reset_index(drop=True)
print("[PRICE] head:\n", df_price.head())


# VQ-VAE 코드와 price join
df_vq = pd.read_csv(VQ_CODE_CSV_PATH)
df_vq["date"] = pd.to_datetime(df_vq["date"])

df_merged = pd.merge(df_vq, df_price, on="date", how="inner")
df_merged = df_merged.sort_values("date").reset_index(drop=True)

# News Headline join
df_news = pd.read_csv("NEWS_CSV_PATH")
df_news["Date"] = pd.to_datetime(df_news["Date"])

news_grouped = (
    df_news.groupby("Date")["Title"]
    .apply(list)
    .reset_index()
    .rename(columns={"Date": "date", "Title": "titles"})
)

#merge all
df_all = pd.merge(
    df_merged,                # VQ 코드 + 가격 요약
    news_grouped[["date", "news_summary_raw"]],  # 뉴스 요약
    on="date",
    how="left",
)


FileNotFoundError: sagemaker-us-west-2-327784329358/prepared_data/price_ohlcv.csv

In [None]:
def build_prompt(code_value: int, ohlcv_summary: str):
    prompt = (
        "### 코드\n" + str(code_value) + "\n\n"
        "### 시계열 OHLCV\n" + ohlcv_summary + "\n\n"

        "위의 Code 벡터 정보와 OHLCV 패턴을 바탕으로, 해당 구간의 시장 움직임을 설명할 수 있는 한국어 뉴스 요약을 2~3 문장으로 작성하라."
        "데이터에 기반한 보수적 설명만 사용하고, 입력에 없는 사건을 새로 만들어내지 마라."
    )
    return prompt


# 프롬프트 컬럼 생성 (레이블은 placeholder)
df_all["prompt"] = df_all.apply(
    lambda row: build_prompt(
        code_value=row["code"],
        ohlcv_summary=row["ohlcv_summary"],
        news_summary=row["news_summary_raw"],
    ),
    axis=1,
)

df_all["completion"] = df_all.apply(
    lambda row:
        df_all["completion"] = row["news_summary_raw"]
    axis=1,
)
# 실제 프로젝트에서는 아래 completion을 사람/teacher LLM으로 채워넣어야 함
# silver label을 llm api로 사용하거나 사람 손으로 gole label을 붙여 넣어야 함
df_all["completion"] = "<COMMENTARY_HERE>"

print("[ALL with prompt/completion] head:\n",
      df_all[["date", "code", "prompt", "completion"]].head())


In [None]:
import boto3
import json

def export_jsonl(df: pd.DataFrame, out_path: str) -> None:
    os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
    with open(out_path, "w", encoding="utf-8") as f:
        for row in df.itertuples():
            rec = {
                "prompt": getattr(row, "prompt"),
                "completion": getattr(row, "completion"),
                "date": str(getattr(row, "date")),
            }
            f.write(json.dumps(rec, ensure_ascii=False) + "\n")
    print(f"Saved JSONL to {out_path}")

with 
export_jsonl(df_all, OUTPUT_JSONL_LOCAL_PATH)

#upload to s3 bucket
s3 = boto3.client("s3")
s3.upload_file(OUTPUT_JSONL_LOCAL_PATH, BUCKET, OUTPUT_JSONL_S3_KEY)

print(f"Uploaded to s3://{BUCKET}/{OUTPUT_JSONL_S3_KEY}")