In [2]:
# db.py
import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 환경변수에서 DATABASE_URL 가져오고, 없으면 로컬 기본값 사용
DATABASE_URL = os.getenv(
    "DATABASE_URL", "postgresql://postgres:password@3.37.207.16:5432/postgres"
)

# SQLAlchemy 엔진 생성
engine = create_engine(DATABASE_URL)

# 세션 팩토리
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Base 클래스
Base = declarative_base()

  Base = declarative_base()


In [4]:
import sys
import os

# fastapi 폴더가 있는 디렉토리 절대경로를 sys.path에 추가
BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), "../../"))  # notebooks의 상위
sys.path.append(BASE_DIR)

In [5]:
from fastapi.models.news import NewsModel_v2, NewsModel_v2_Metadata

# main.py 또는 Jupyter Notebook에서 실행

# ✅ DB URL 확인
print("🔗 연결된 DB URL:", engine.url)

# ✅ 세션 생성
db = SessionLocal()

# ✅ 뉴스 5건 조회
results = (
    db.query(NewsModel_v2.news_id, NewsModel_v2.wdate, NewsModel_v2_Metadata.stock_list)
    .join(NewsModel_v2, NewsModel_v2.news_id == NewsModel_v2_Metadata.news_id)
    .all()
)

# ✅ 결과 출력
# for row in results:
    # print(f"{row.news_id} | {row.wdate} |{row.stock_list[0]}")

# ✅ 세션 종료
db.close()

🔗 연결된 DB URL: postgresql://postgres:***@3.37.207.16:5432/postgres


In [6]:
results[:2]

[('20250523_0002', datetime.datetime(2025, 5, 23, 18, 52), [{'stock_id': '377300', 'stock_name': '카카오페이'}]),
 ('20250523_0004', datetime.datetime(2025, 5, 23, 18, 33), [{'stock_id': '005930', 'stock_name': '삼성전자'}])]

In [3]:
import pandas as pd
import ast
import numpy as np
import os
from pykrx import stock
import requests
from datetime import timedelta
from tqdm import tqdm

from dotenv import load_dotenv

load_dotenv()

True

In [8]:
class NewsMarketPipeline:

    def __init__(self, news_list, df_base_rate):
        self.api_key = os.getenv("KOREA_BANK_API_KEY")

        self.df = pd.DataFrame(news_list)
        self.ticker_name_map = None
        self.trading_days = None
        self.ohlcv_dict = {}
        self.trading_dict = {}
        self.fx_df = None
        self.bond_df = None
        self.rate_df = df_base_rate

    def get_df(self):
        return self.df

    def extract_stock_name(self):
        if "stock_list" not in self.df.columns:
            raise Exception(
                "stock_list 컬럼이 없습니다. 실제 컬럼: "
                + str(self.df.columns.tolist())
            )

        def get_last_stock_name(x):
            try:
                items = ast.literal_eval(x) if isinstance(x, str) else x
                return items[-1]["stock_name"] if items else None
            except:
                return None

        self.df["stock_name"] = self.df["stock_list"].apply(get_last_stock_name)

    def add_news_date(self):
        if "wdate" in self.df.columns:
            self.df["wdate"] = pd.to_datetime(self.df["wdate"])
            self.df["news_date"] = self.df["wdate"].dt.normalize()
        elif "news_date" in self.df.columns:
            self.df["news_date"] = pd.to_datetime(self.df["news_date"])
        else:
            raise Exception(
                "wdate/news_date 컬럼이 없습니다. 실제 컬럼: "
                + str(self.df.columns.tolist())
            )

    def get_ticker_name_map(self, recent_date="2025-05-30"):
        kospi_tickers = stock.get_market_ticker_list(date=recent_date, market="KOSPI")
        return {
            stock.get_market_ticker_name(ticker): ticker for ticker in kospi_tickers
        }

    def add_ticker(self):
        if self.ticker_name_map is None:
            self.ticker_name_map = self.get_ticker_name_map()

        self.df["ticker"] = self.df["stock_name"].apply(
            lambda name: self.ticker_name_map.get(name) if pd.notna(name) else None
        )

    def get_trading_days(self, start_year=2022, end_year=2026):
        days = []
        for y in range(start_year, end_year + 1):
            for m in range(1, 13):
                try:
                    days_this_month = stock.get_previous_business_days(year=y, month=m)
                    days.extend(days_this_month)
                except:
                    pass
        return pd.to_datetime(sorted(set(days)))

    def adjust_to_nearest_trading_day(self, date):
        idx = self.trading_days.searchsorted(date, side="right") - 1
        if idx >= 0:
            return self.trading_days[idx]
        return pd.NaT

    def add_trading_dates(self):
        if self.trading_days is None:
            self.trading_days = self.get_trading_days()

        self.df["d_day_date"] = self.df["news_date"].apply(
            self.adjust_to_nearest_trading_day
        )

        offsets = {
            "d_minus_5_date": -5,
            "d_minus_4_date": -4,
            "d_minus_3_date": -3,
            "d_minus_2_date": -2,
            "d_minus_1_date": -1,
            "d_day_date": 0,
        }

        def fill_offsets(row):
            d_day = row["d_day_date"]
            if not pd.isna(d_day):
                weekday = d_day.weekday()
                if weekday == 5:
                    d_day = self.adjust_to_nearest_trading_day(
                        d_day - timedelta(days=1)
                    )
                elif weekday == 6:
                    d_day = self.adjust_to_nearest_trading_day(
                        d_day - timedelta(days=2)
                    )

            res = {}
            if pd.isna(d_day):
                for k in offsets:
                    res[k] = pd.NaT
                return pd.Series(res)

            idx = self.trading_days.searchsorted(d_day)
            for k, v in offsets.items():
                i = idx + v
                res[k] = (
                    self.trading_days[i] if 0 <= i < len(self.trading_days) else pd.NaT
                )
            return pd.Series(res)

        df_offsets = self.df.apply(fill_offsets, axis=1)
        self.df = pd.concat(
            [self.df.reset_index(drop=True), df_offsets.reset_index(drop=True)], axis=1
        )

    def fetch_ohlcv_and_trading(self):
        offsets = [f"d_minus_{i}_date" for i in range(1, 6)]
        all_dates = (
            pd.concat([self.df[col] for col in offsets], ignore_index=True)
            .dropna()
            .unique()
        )
        all_dates_str = sorted(
            [pd.to_datetime(d).strftime("%Y%m%d") for d in all_dates]
        )
        tickers = self.df["ticker"].dropna().unique().tolist()

        for ticker in tickers:
            try:
                self.ohlcv_dict[ticker] = stock.get_market_ohlcv_by_date(
                    min(all_dates_str), max(all_dates_str), ticker
                )
            except:
                pass
            try:
                self.trading_dict[ticker] = stock.get_market_trading_value_by_date(
                    min(all_dates_str), max(all_dates_str), ticker
                )
            except:
                pass

    def add_ohlcv_and_trading(self):
        offsets = [f"d_minus_{i}_date" for i in range(1, 6)]

        all_ohlcv_rows = []
        for ticker, df in self.ohlcv_dict.items():
            df = df.reset_index().rename(columns={"날짜": "date"})
            df["ticker"] = ticker
            all_ohlcv_rows.append(df[["date", "ticker", "종가", "거래량"]])
        df_ohlcv_all = pd.concat(all_ohlcv_rows) if all_ohlcv_rows else pd.DataFrame()

        all_trading_rows = []
        for ticker, df in self.trading_dict.items():
            df = df.reset_index().rename(columns={"날짜": "date"})
            df["ticker"] = ticker
            df = df[["date", "ticker", "외국인합계", "기관합계", "개인"]]
            all_trading_rows.append(df)
        df_trading_all = (
            pd.concat(all_trading_rows) if all_trading_rows else pd.DataFrame()
        )

        for col in offsets:
            self.df = (
                self.df.merge(
                    df_ohlcv_all,
                    how="left",
                    left_on=[col, "ticker"],
                    right_on=["date", "ticker"],
                )
                .rename(columns={"종가": f"{col}_close", "거래량": f"{col}_volume"})
                .drop(columns="date")
            )
            self.df = (
                self.df.merge(
                    df_trading_all,
                    how="left",
                    left_on=[col, "ticker"],
                    right_on=["date", "ticker"],
                )
                .rename(
                    columns={
                        "외국인합계": f"{col}_foreign",
                        "기관합계": f"{col}_institution",
                        "개인": f"{col}_individual",
                    }
                )
                .drop(columns="date")
            )

    def fetch_fx(self, start_date, end_date):
        if self.fx_df is not None:
            return self.fx_df
        url = f"https://ecos.bok.or.kr/api/StatisticSearch/{self.api_key}/json/kr/1/1000/731Y001/D/{start_date}/{end_date}/0000001/"
        resp = requests.get(url).json()
        if "StatisticSearch" not in resp or "row" not in resp["StatisticSearch"]:
            return pd.DataFrame()
        df = pd.DataFrame(resp["StatisticSearch"]["row"])
        df["date"] = pd.to_datetime(df["TIME"], format="%Y%m%d")
        df["usdkrw"] = pd.to_numeric(df["DATA_VALUE"], errors="coerce")
        self.fx_df = df[["date", "usdkrw"]].sort_values("date")
        return self.fx_df

    def fetch_bond10y(self, start_date, end_date):
        if self.bond_df is not None:
            return self.bond_df
        url = f"https://ecos.bok.or.kr/api/StatisticSearch/{self.api_key}/json/kr/1/1000/817Y002/D/{start_date}/{end_date}/010200000/"
        resp = requests.get(url).json()
        if "StatisticSearch" not in resp or "row" not in resp["StatisticSearch"]:
            return pd.DataFrame()
        df = pd.DataFrame(resp["StatisticSearch"]["row"])
        df["date"] = pd.to_datetime(df["TIME"], format="%Y%m%d")
        df["bond10y"] = pd.to_numeric(df["DATA_VALUE"], errors="coerce")
        self.bond_df = df[["date", "bond10y"]].sort_values("date")
        return self.bond_df

    def add_external_vars(self):
        self.df = self.df.sort_values("news_date")
        if self.trading_days is None:
            self.trading_days = self.get_trading_days()
        raw_start = self.df["news_date"].min() - timedelta(days=1)
        raw_end = self.df["news_date"].max() - timedelta(days=1)
        start_date = self.adjust_to_nearest_trading_day(raw_start)
        end_date = self.adjust_to_nearest_trading_day(raw_end)
        if pd.isna(start_date) or pd.isna(end_date):
            return

        start_str, end_str = start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
        fx_df = self.fetch_fx(start_str, end_str)
        bond_df = self.fetch_bond10y(start_str, end_str)

        if not fx_df.empty:
            self.df = pd.merge_asof(
                self.df,
                fx_df.rename(columns={"date": "news_date", "usdkrw": "fx"}),
                on="news_date",
                direction="backward",
            )
        if not bond_df.empty:
            self.df = pd.merge_asof(
                self.df,
                bond_df.rename(columns={"date": "news_date"}),
                on="news_date",
                direction="backward",
            )
        if self.rate_df is not None and not self.rate_df.empty:
            self.df = pd.merge_asof(
                self.df,
                self.rate_df.rename(columns={"date": "news_date", "rate": "base_rate"}),
                on="news_date",
                direction="backward",
            )

    def run(self):
        steps = [
            ("extract_stock_name", self.extract_stock_name),
            ("add_news_date", self.add_news_date),
            ("add_ticker", self.add_ticker),
            ("add_trading_dates", self.add_trading_dates),
            ("fetch_ohlcv_and_trading", self.fetch_ohlcv_and_trading),
            ("add_ohlcv_and_trading", self.add_ohlcv_and_trading),
            ("add_external_vars", self.add_external_vars),
        ]

        for step_name, func in steps:
            try:
                func()
            except Exception as e:
                print(f"[ERROR] Step '{step_name}' failed: {e}")

        try:
            self.df = self.df.drop(
                columns=["wdate", "stock_list", "stock_name", "news_date", "ticker"]
                + [f"d_minus_{i}_date" for i in range(1, 6)]
                + ["d_day_date"],
                errors="ignore",
            )
        except Exception as e:
            print(f"[WARN] Drop columns failed: {e}")

        return self.df.to_dict(orient="records")


In [9]:
news_list = results.copy()

In [10]:
len(news_list)

13886

In [11]:
df_base_rate = pd.read_csv("../../automation/db/korea_base_rate_daily.csv")

In [12]:
df_base_rate.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6361 entries, 0 to 6360
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   date    6361 non-null   object 
 1   rate    6361 non-null   float64
dtypes: float64(1), object(1)
memory usage: 99.5+ KB


In [13]:
df_base_rate["date"] = pd.to_datetime(df_base_rate["date"])

In [14]:
news_list[:2]

[('20250523_0002', datetime.datetime(2025, 5, 23, 18, 52), [{'stock_id': '377300', 'stock_name': '카카오페이'}]),
 ('20250523_0004', datetime.datetime(2025, 5, 23, 18, 33), [{'stock_id': '005930', 'stock_name': '삼성전자'}])]

In [15]:
news_list_converted = [
    {
        "news_id": news_id,
        "wdate": wdate.strftime("%Y-%m-%d %H:%M:%S"),  # ✅ datetime → str
        "stock_list": stock_list,
    }
    for news_id, wdate, stock_list in news_list
]

In [16]:
news_list_converted[0]

{'news_id': '20250523_0002',
 'wdate': '2025-05-23 18:52:00',
 'stock_list': [{'stock_id': '377300', 'stock_name': '카카오페이'}]}

In [379]:
from tqdm import tqdm

pipeline = NewsMarketPipeline(news_list_converted[:1], df_base_rate)

In [380]:
test = pipeline.run()

In [17]:
test[0]

NameError: name 'test' is not defined

In [18]:
df1 = pd.DataFrame(test[0])

NameError: name 'test' is not defined

In [19]:
df1.head(4)

NameError: name 'df1' is not defined

In [20]:
df2 = pd.DataFrame(test[1])

NameError: name 'test' is not defined

In [21]:
df2.head(4)

NameError: name 'df2' is not defined

In [22]:
df = df[
    [
        "news_id",
        "d_minus_5_date_close",
        "d_minus_5_date_volume",
        "d_minus_5_date_foreign",
        "d_minus_5_date_institution",
        "d_minus_5_date_individual",
        "d_minus_4_date_close",
        "d_minus_4_date_volume",
        "d_minus_4_date_foreign",
        "d_minus_4_date_institution",
        "d_minus_4_date_individual",
        "d_minus_3_date_close",
        "d_minus_3_date_volume",
        "d_minus_3_date_foreign",
        "d_minus_3_date_institution",
        "d_minus_3_date_individual",
        "d_minus_2_date_close",
        "d_minus_2_date_volume",
        "d_minus_2_date_foreign",
        "d_minus_2_date_institution",
        "d_minus_2_date_individual",
        "d_minus_1_date_close",
        "d_minus_1_date_volume",
        "d_minus_1_date_foreign",
        "d_minus_1_date_institution",
        "d_minus_1_date_individual",
        "fx",
        "bond10y",
        "base_rate",
    ]
]

NameError: name 'df' is not defined

In [23]:
df.head()

NameError: name 'df' is not defined

In [24]:
df[]

SyntaxError: invalid syntax (1951995059.py, line 1)

In [None]:
def extract_d_minus_1_info_single(news: dict) -> dict:
    from pykrx import stock
    import pandas as pd
    from datetime import datetime, timedelta

    # 날짜 처리
    news_date = pd.to_datetime(news["wdate"]).normalize()
    year, month = news_date.year, news_date.month

    # 전달 계산
    if month == 1:
        prev_year, prev_month = year - 1, 12
    else:
        prev_year, prev_month = year, month - 1

    # 거래일 수집 (해당 월 + 전달)
    trading_days = []
    for y, m in [(prev_year, prev_month), (year, month)]:
        try:
            days = stock.get_previous_business_days(year=y, month=m)
            trading_days.extend(days)
        except:
            continue
    trading_days = pd.to_datetime(sorted(set(trading_days)))

    # D-day (뉴스일 기준 가장 가까운 거래일)
    d_day_idx = trading_days.searchsorted(news_date, side="right") - 1
    if d_day_idx < 0:
        return {}

    d_day = trading_days[d_day_idx]
    d_minus_1_idx = d_day_idx - 1
    if d_minus_1_idx < 0:
        return {}

    d_minus_1 = trading_days[d_minus_1_idx]

    # Ticker 추출
    stock_list = news.get("stock_list", [])
    if not stock_list or not isinstance(stock_list, list):
        return {}

    ticker = str(stock_list[-1]["stock_id"]).zfill(6)

    # d-1 및 fallback 날짜 문자열 생성
    fallback_dates = [d_minus_1 - timedelta(days=i) for i in range(0, 10)]
    fallback_dates_str = [d.strftime("%Y%m%d") for d in fallback_dates]

    # OHLCV 수집
    try:
        ohlcv = stock.get_market_ohlcv_by_date(
            min(fallback_dates_str), max(fallback_dates_str), ticker
        ).reset_index()
        ohlcv.rename(columns={"날짜": "date"}, inplace=True)
        ohlcv["ticker"] = ticker
    except:
        ohlcv = pd.DataFrame()

    # 수급 데이터 수집
    try:
        trade = stock.get_market_trading_value_by_date(
            min(fallback_dates_str), max(fallback_dates_str), ticker
        ).reset_index()
        trade.rename(columns={"날짜": "date"}, inplace=True)
        trade["ticker"] = ticker
    except:
        trade = pd.DataFrame()

    # fallback: 가장 가까운 날짜의 값
    def get_latest(source_df, cols):
        for d in fallback_dates:
            row = source_df[(source_df["date"] == d) & (source_df["ticker"] == ticker)]
            if not row.empty:
                return row.iloc[0][cols].to_dict()
        return {col: None for col in cols}

    ohlcv_vals = get_latest(ohlcv, ["종가", "거래량"])
    trade_vals = get_latest(trade, ["개인", "기관합계", "외국인합계"])

    return {
        "news_id": news["news_id"],
        "d_minus_1_close": ohlcv_vals["종가"],
        "d_minus_1_volume": ohlcv_vals["거래량"],
        "d_minus_1_individual": trade_vals["개인"],
        "d_minus_1_institution": trade_vals["기관합계"],
        "d_minus_1_foreign": trade_vals["외국인합계"],
    }

In [46]:
news = {
    "news_id": "20250523_0002",
    "wdate": "2025-05-23 18:52:00",
    "stock_list": [{"stock_id": "377300", "stock_name": "카카오페이"}],
}

In [44]:
extract_d_minus_1_info_single(news)

{'news_id': '20250523_0002',
 'd_minus_1_date': '2025-05-22',
 'd_minus_1_close': 30300,
 'd_minus_1_volume': 197238,
 'd_minus_1_individual': -1916639300,
 'd_minus_1_institution': 1166288875,
 'd_minus_1_foreign': 574800375}