In [None]:
import sys

# 프로젝트 루트 직접 지정
sys.path.insert(0, r"c:\Users\tjrrj\vscode\doritest")

In [11]:
import pandas as pd
from modules import add_surrogate_key   

df = pd.read_excel(
    r"D:\Download\coupang_eats_2025-11.xlsx",
    header=1           # 2행을 헤더로 사용
)
df = df[1:]
df = df[["주문번호", "일자", "시간", "주문금액"]]

  warn("Workbook contains no default style, apply openpyxl's default")


In [12]:
df["일자"] = pd.to_datetime(df["일자"]).dt.date
df = add_surrogate_key(df, natural_key_cols=["주문번호", "일자", "시간"]).head()
df["주문금액"] = df["주문금액"].astype(int)
df.rename(columns={"key" : "order_id"}, inplace=True)
df

Unnamed: 0,order_id,주문번호,일자,시간,주문금액
1,25501f4c34e41650,2DEKBH,2025-11-01,2025-11-01 13:41:28,24100
2,6c8f45025bbee74e,0YEE8K,2025-11-01,2025-11-01 14:49:24,56100
3,046b9cb1a9cc8dbe,29F98W,2025-11-01,2025-11-01 16:51:33,16100
4,dd24c62c2bbc9d22,1TY694,2025-11-01,2025-11-01 17:39:05,21400
5,45116497b2250554,28MJJK,2025-11-01,2025-11-01 18:25:45,16000


In [13]:
df.rename(columns={"key" : "order_id",
                   "주문번호" : "order_number",
                    "일자" : "order_date",
                    "시간" : "order_time",
                    "주문금액" : "order_amount"
                   }, inplace=True)

In [None]:
"""
DB 저장 모듈 (load/db_writer.py)
- Airflow Task에서 단일 함수로 호출 가능
"""

from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime
from typing import Literal


def postgre_db_save(
    df: pd.DataFrame,
    table: str,
    schema: str = 'public',
    pk_col: str = 'order_id',
    add_timestamp: bool = True,
    timestamp_col: str = 'created_at',
    if_exists: Literal['append', 'replace'] = 'append',
    db_config: dict = None
) -> dict:
    """
    중복 제외 후 DB에 저장 (Airflow Task용 단일 함수)
    """
    
    # DB 설정
    config = db_config or {
        'user': 'postgres',
        'password': '3040',
        'host': 'postgres',
        'port': '5432',
        'database': 'postgres'
    }
    
    # 빈 데이터 체크
    if df.empty:
        print(f"[WARN] 저장할 데이터 없음")
        return {'inserted': 0, 'duplicated': 0, 'total': 0}
    
    # DB 엔진 생성
    engine = create_engine(
        f"postgresql+psycopg2://{config['user']}:{config['password']}@"
        f"{config['host']}:{config['port']}/{config['database']}"
    )
    
    total_count = len(df)
    
    # replace 모드
    if if_exists == 'replace':
        df_to_save = df.copy()
        if add_timestamp and timestamp_col not in df_to_save.columns:
            df_to_save[timestamp_col] = datetime.now()
        df_to_save.to_sql(table, con=engine, schema=schema, if_exists='replace', index=False)
        print(f"[OK] {schema}.{table} 덮어쓰기: {total_count}건")
        return {'inserted': total_count, 'duplicated': 0, 'total': total_count}
    
    # append 모드: 중복 제외
    try:
        existing_df = pd.read_sql_table(table, con=engine, schema=schema, columns=[pk_col])
        existing_ids = set(existing_df[pk_col].tolist())
        print(f"[DEBUG] 기존 데이터 {len(existing_ids)}건 발견")
    except Exception as e:
        print(f"[INFO] 기존 데이터 없음: {e}")
        existing_ids = set()

    
    new_df = df[~df[pk_col].isin(existing_ids)].copy()
    dup_count = total_count - len(new_df)
    
    if len(new_df) > 0:
        if add_timestamp and timestamp_col not in new_df.columns:
            new_df[timestamp_col] = datetime.now()
        new_df.to_sql(table, con=engine, schema=schema, if_exists='append', index=False)
        print(f"[OK] {schema}.{table}: 신규 {len(new_df)}건, 중복 {dup_count}건")
    else:
        print(f"[WARN] {schema}.{table}: 신규 없음 (중복 {dup_count}건)")
    
    return {'inserted': len(new_df), 'duplicated': dup_count, 'total': total_count}

In [22]:
postgre_db_save(df=df,
                schema='doridang',
                table='baemin_sales'
                 )

[WARN] doridang.baemin_sales: 신규 없음 (중복 5건)


{'inserted': 0, 'duplicated': 5, 'total': 5}

# 호환성 체크


# 설치
pip install pandas sqlalchemy psycopg2-binary

# ⚠️ 설치 후 반드시 커널 재시작 필요!
# Jupyter: Kernel → Restart Kernel
# VS Code: 상단 Restart 버튼

In [17]:
import pandas as pd
import sqlalchemy

print(f"pandas: {pd.__version__}")
print(f"sqlalchemy: {sqlalchemy.__version__}")

# 호환성 체크
pd_major = int(pd.__version__.split('.')[0])
sa_major = int(sqlalchemy.__version__.split('.')[0])

if pd_major >= 2 and sa_major < 2:
    print("⚠️ 경고: pandas 2.x는 SQLAlchemy 2.x 필요!")
    print("실행: pip install 'sqlalchemy>=2.0'")
else:
    print("✅ 버전 호환성 OK")

pandas: 2.3.3
sqlalchemy: 2.0.45
✅ 버전 호환성 OK
