In [4]:
import pandas as pd
from sqlalchemy import create_engine
import os
import matplotlib.pyplot as plt
import seaborn as sns

# Docker Compose 환경 변수에서 DB 접속 정보 읽기
# Airflow 컨테이너 내부이므로, settings.py 대신 os.environ 직접 사용 가능
db_user = os.environ.get("POSTGRES_USER", "crypto")
db_password = os.environ.get("POSTGRES_PASSWORD", "crypto")
db_host = "postgres" # docker-compose.yml의 서비스 이름 사용
db_name = os.environ.get("POSTGRES_DB", "cryptoflow")

# SQLAlchemy를 사용하여 PostgreSQL 연결 엔진 생성
engine = create_engine(f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}:5432/{db_name}")

In [5]:
# 1. 데이터 로딩 (from Data Lake)
print("--- 1. 데이터 로딩 시작 ---")

# 데이터 레이크에서 KRW-BTC Ticker 데이터만 조회
# 데이터가 너무 많을 수 있으므로, 최근 1주일(예시) 데이터만 가져오도록 LIMIT 설정 가능
# ORDER BY ingested_at DESC LIMIT (144 * 7) --> 10분 간격 * 24시간 * 7일
sql_query_lake = """
SELECT 
    id, 
    payload, 
    endpoint, 
    ingested_at 
FROM 
    upbit_api_raw_data_landing 
WHERE 
    endpoint = '/v1/ticker' 
    AND payload ->> 'market' = 'KRW-BTC' -- KRW-BTC 데이터만 필터링
ORDER BY 
    ingested_at DESC -- 최신 데이터부터 가져오기 (필요시 LIMIT 추가)
-- LIMIT 1008 -- 예: 최근 1주일 데이터만 가져오려면 주석 해제 (144 * 7 = 1008)
"""

# JSONB를 포함한 데이터를 Pandas DataFrame으로 읽어오기
# json_normalize를 사용하여 중첩된 JSON 구조를 펼칠 수도 있지만,
# 여기서는 필요한 컬럼만 직접 추출하는 방식으로 진행
df_raw_btc = pd.read_sql(sql_query_lake, engine, index_col='id')

print(f"데이터 레이크 로드 완료: {len(df_raw_btc)} 행")

# 데이터 확인
print(df_raw_btc.head())
print(df_raw_btc.info())

--- 1. 데이터 로딩 시작 ---
데이터 레이크 로드 완료: 50 행
                                                 payload    endpoint  \
id                                                                     
10739  {'change': 'FALL', 'market': 'KRW-BTC', 'low_p...  /v1/ticker   
10524  {'change': 'FALL', 'market': 'KRW-BTC', 'low_p...  /v1/ticker   
10309  {'change': 'FALL', 'market': 'KRW-BTC', 'low_p...  /v1/ticker   
10094  {'change': 'FALL', 'market': 'KRW-BTC', 'low_p...  /v1/ticker   
9879   {'change': 'FALL', 'market': 'KRW-BTC', 'low_p...  /v1/ticker   

                           ingested_at  
id                                      
10739 2025-10-14 11:21:09.852717+00:00  
10524 2025-10-14 11:21:04.023313+00:00  
10309 2025-10-14 11:18:20.265180+00:00  
10094 2025-10-14 11:18:09.463469+00:00  
9879  2025-10-13 17:20:02.150627+00:00  
<class 'pandas.core.frame.DataFrame'>
Index: 50 entries, 10739 to 204
Data columns (total 3 columns):
 #   Column       Non-Null Count  Dtype              
---  ------

DETAIL:  The database was created using collation version 2.36, but the operating system provides version 2.41.
HINT:  Rebuild all objects in this database that use the default collation and run ALTER DATABASE cryptoflow REFRESH COLLATION VERSION, or build PostgreSQL with the right library version.
