# Gold Layer: 환자 임상 요약 생성

> **Purpose:** Silver Layer의 정제된 데이터를 결합/집계하여 AI 추론용 최종 테이블을 생성합니다.
> - 환자별 바이탈 통계 집계 (avg, max, count)
> - 진료 기록(diagnosis, medication) 결합
> - AI 추론 입력 데이터셋 생성
>
> **카탈로그:** `P2T2.gold.patient_clinical_summary`

## 0. 카탈로그 설정

In [None]:
spark.sql("USE CATALOG P2T2")
print("✅ Catalog: P2T2")

## 1. 라이브러리

In [None]:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

## 2. 환자별 바이탈 통계 집계

In [None]:

def aggregate_patient_vitals():
    """
    silver.cleaned_vital_signs를 환자별로 집계합니다.
    
    실제 컬럼: patient_id, timestamp, heart_rate, systolic_bp,
              diastolic_bp, spo2, temperature, respiratory_rate, risk_score
    """
    df_vitals = spark.table("P2T2.silver.cleaned_vital_signs")
    print(f"Silver vital_signs: {df_vitals.count():,} rows")
    
    # 환자별 집계
    df_agg = df_vitals.groupBy("patient_id").agg(
        F.round(F.avg("heart_rate"), 1).alias("avg_heart_rate"),
        F.round(F.avg("systolic_bp"), 1).alias("avg_systolic_bp"),
        F.round(F.avg("diastolic_bp"), 1).alias("avg_diastolic_bp"),
        F.round(F.avg("spo2"), 1).alias("avg_spo2"),
        F.round(F.avg("temperature"), 2).alias("avg_temperature"),
        F.round(F.avg("respiratory_rate"), 1).alias("avg_respiratory_rate"),
        F.round(F.max("risk_score"), 2).alias("max_risk_score"),
        F.round(F.avg("risk_score"), 2).alias("avg_risk_score"),
        F.count("*").alias("vital_count"),
    )
    
    return df_agg

v_agg = aggregate_patient_vitals()
v_agg.show()

## 3. 진료 기록 결합

In [None]:

def join_medical_history(df_vitals_agg):
    """
    바이탈 집계와 silver.cleaned_medical_history를 결합합니다.
    
    실제 컬럼: patient_id, record_date, hospital, department,
              diagnosis, medication, notes
    """
    h = spark.table("P2T2.silver.cleaned_medical_history")
    
    h_agg = h.groupBy("patient_id").agg(
        F.count("*").alias("history_count"),
        F.concat_ws(", ", F.collect_list("diagnosis")).alias("diagnoses"),
        F.concat_ws(", ", F.collect_list("medication")).alias("medications"),
    )
    
    df_joined = df_vitals_agg.join(h_agg, "patient_id", "left")
    return df_joined

gold = join_medical_history(v_agg)
print(f"Gold join 결과: {gold.count()} rows")
gold.show(truncate=False)

## 4. Gold Layer 테이블 생성

In [None]:

# 생성 시각 추가
gold = gold.withColumn("aggregated_at", F.current_timestamp())

# 기존 테이블 삭제 후 저장 (스키마 충돌 방지)
spark.sql("DROP TABLE IF EXISTS P2T2.gold.patient_clinical_summary")

gold.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("P2T2.gold.patient_clinical_summary")

count = spark.table("P2T2.gold.patient_clinical_summary").count()
print(f"✅ Gold Layer 생성 완료: {count:,} patients")
print(f"   → P2T2.gold.patient_clinical_summary")

# 컬럼 확인
print(f"   Columns: {spark.table('P2T2.gold.patient_clinical_summary').columns}")
spark.table("P2T2.gold.patient_clinical_summary").show(truncate=False)

## 5. AI 추론 입력 데이터 추출

In [None]:

def get_patients_for_ai_inference(limit=100):
    """
    AI 추론(OpenAI API)에 전달할 환자 데이터를 추출합니다.
    위험도가 높은 환자 우선 처리
    """
    df = spark.sql(f"""
        SELECT 
            patient_id,
            avg_heart_rate,
            avg_systolic_bp,
            avg_diastolic_bp,
            avg_spo2,
            avg_temperature,
            avg_respiratory_rate,
            max_risk_score,
            diagnoses,
            medications
        FROM P2T2.gold.patient_clinical_summary
        ORDER BY max_risk_score DESC
        LIMIT {limit}
    """)
    
    print(f"✅ AI 추론 대기 환자: {df.count()}명 (위험도순)")
    df.show(truncate=False)
    return df

df_ai_input = get_patients_for_ai_inference(100)