In [0]:
# 1. 라이브러리 임포트
# -----------------------------------------------------------------------------
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import pandas as pd
import numpy as np


In [0]:
# 2. 데이터 로드 및 시계열 특성 생성
# -----------------------------------------------------------------------------
print("실버 테이블에서 데이터 로드 중...")
df_silver = spark.table("postgres_team5_catalog.silver.silver_ped_acid_stats")

# 시계열 윈도우: 지역별, 보행자상태별로 연도 순 정렬
time_window = Window.partitionBy("gugun_nm", "ped_stat").orderBy("searchyear")

#  '전체' 보행상태 데이터 제외 (개별 보행상태별 분석을 위해)
print("'전체' 보행상태 데이터 제외 중...")
df_silver_filtered = df_silver.filter(col("ped_stat") != "전체")

# 다음 연도 타겟 변수 생성 (예측할 값)
df_gold = (
    df_silver
    .withColumn("next_year_risk_score", lead("risk_score", 1).over(time_window))
    .withColumn("next_year_dth_rate", lead("dth_rate", 1).over(time_window))
    .filter(col("next_year_risk_score").isNotNull() & 
            col("next_year_dth_rate").isNotNull())
)

실버 테이블에서 데이터 로드 중...
'전체' 보행상태 데이터 제외 중...


In [0]:
# 3. 머신러닝용 데이터 준비
# -----------------------------------------------------------------------------
print("머신러닝용 데이터 준비 중...")
# 실제 존재하는 컬럼만 선택
ml_data = df_gold.select(
    "searchyear", "gugun_nm", "ped_stat", 
    "occrrnc_cnt", "dth_cnt", "injpsn_cnt", "dth_rate", "injpsn_rate", 
    "prev_year_occrrnc_cnt", "occrrnc_cnt_change_rate", "risk_score",
    "next_year_risk_score", "next_year_dth_rate"
).toPandas()
# 컬럼들 가져오고 '전체'를 여기서 삭제. 

머신러닝용 데이터 준비 중...




In [0]:
# 4. 피처 엔지니어링
# -----------------------------------------------------------------------------
# 사고 심각도 및 연도 특성 추가
# ml_data['accident_severity'] = ml_data['dth_cnt'] / ml_data['occrrnc_cnt'].where(ml_data['occrrnc_cnt'] > 0, 1)
# ml_data['searchyear'] = pd.to_numeric(ml_data['searchyear'])
# ml_data['year_squared'] = ml_data['searchyear'] ** 2  # 제거해서 성능 비교

# 타겟 변수 로그 변환 (분포 정규화)
ml_data['target_risk_score_log'] = np.log1p(ml_data['next_year_risk_score'])
ml_data['target_dth_rate_log'] = np.log1p(pd.to_numeric(ml_data['next_year_dth_rate']))


In [0]:
# 5. 데이터 준비 및 전처리
# -----------------------------------------------------------------------------
print("데이터 준비 및 전처리 중...")

# 피처 정의
categorical_features = ['gugun_nm', 'ped_stat']
numerical_features = ['searchyear', 'occrrnc_cnt', 'dth_cnt', 'injpsn_cnt',
                     'dth_rate', 'injpsn_rate', 'prev_year_occrrnc_cnt', 
                     'occrrnc_cnt_change_rate', 'risk_score', 
                     ]  # year_squared 제거'accident_severity'

X = ml_data[categorical_features + numerical_features]
y_risk_score = ml_data['target_risk_score_log'] # 타겟변수 선택 
y_dth_rate = ml_data['target_dth_rate_log']

# 데이터 분할
X_train, X_test, y_risk_train, y_risk_test, y_dth_train, y_dth_test = train_test_split(
    X, y_risk_score, y_dth_rate, test_size=0.2, random_state=42
)

# 전처리 파이프라인
preprocessor = ColumnTransformer([
    ('num', StandardScaler(), numerical_features),
    ('cat', OneHotEncoder(handle_unknown='ignore', sparse=False), categorical_features)
])
# # 원본: [10, 100, 1000]
# 표준화 후: [-1.0, 0, 1.0]
# '서울' → [1, 0, 0]
# '부산' → [0, 1, 0]
# '대구' → [0, 0, 1]

데이터 준비 및 전처리 중...


In [0]:
# 5-1. 위험점수 예측 모델
# -----------------------------------------------------------------------------
print("\n위험점수 예측 모델 학습 중...")
risk_score_model = Pipeline([
    ('preprocessor', preprocessor),
    ('regressor', RandomForestRegressor(n_estimators=100, max_depth=15, random_state=42)) # 랜덤포레스트 모델로.
])
risk_score_model.fit(X_train, y_risk_train)
# n_estimators=100: 트리 개수
# max_depth=15: 트리 깊이 제한


# 위험점수 모델 평가
risk_pred_log = risk_score_model.predict(X_test)
risk_pred_orig = np.expm1(risk_pred_log)
risk_test_orig = np.expm1(y_risk_test)
risk_score_rmse = np.sqrt(mean_squared_error(risk_test_orig, risk_pred_orig))
risk_score_r2 = r2_score(risk_test_orig, risk_pred_orig)

print(f"위험점수 예측 모델 성능 - RMSE: {risk_score_rmse:.2f}, R²: {risk_score_r2:.4f}")



위험점수 예측 모델 학습 중...


Uploading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

위험점수 예측 모델 성능 - RMSE: 153.86, R²: 0.9944


In [0]:
# 5-2. 사망률 예측 모델
# -----------------------------------------------------------------------------
print("\n사망률 예측 모델 학습 중...")
dth_rate_model = Pipeline([
    ('preprocessor', preprocessor),
    ('regressor', RandomForestRegressor(n_estimators=100, max_depth=15, random_state=42))
])
dth_rate_model.fit(X_train, y_dth_train)

# 사망률 모델 평가
dth_pred_log = dth_rate_model.predict(X_test)
dth_pred_orig = np.expm1(dth_pred_log)
dth_test_orig = np.expm1(y_dth_test)
dth_rate_rmse = np.sqrt(mean_squared_error(dth_test_orig, dth_pred_orig))
dth_rate_r2 = r2_score(dth_test_orig, dth_pred_orig)

print(f"사망률 예측 모델 성능 - RMSE: {dth_rate_rmse:.4f}, R²: {dth_rate_r2:.4f}")



사망률 예측 모델 학습 중...


Uploading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

사망률 예측 모델 성능 - RMSE: 0.3476, R²: 0.7015


In [0]:
# 6. 골드 테이블 생성
# -----------------------------------------------------------------------------

# 6-1. 전체 데이터 예측
# -----------------------------------------------------------------------------
print("\n전체 데이터 예측 중...")
# 위험점수와 사망률 예측
risk_score_pred_log = risk_score_model.predict(X)
dth_rate_pred_log = dth_rate_model.predict(X)

# 예측 결과를 원래 스케일로 변환하여 추가
ml_data['risk_score_pred'] = np.round(np.expm1(risk_score_pred_log)).astype(int)  # 정수로 변환
ml_data['dth_rate_pred'] = np.round(np.expm1(dth_rate_pred_log), 2)


# 6-2. 최종 골드 테이블 구성
# -----------------------------------------------------------------------------
print("최종 골드 테이블 구성 중...")
#가치가 있는 컬럼만 선택
gold_columns = [
    'searchyear', 'gugun_nm', 'ped_stat',
    'occrrnc_cnt', 'dth_cnt', 'injpsn_cnt', 'dth_rate', 'risk_score',
    'risk_score_pred', 'dth_rate_pred'
]

# 최종 골드 테이블 데이터 프레임 생성
final_gold_data = ml_data[gold_columns]

# '전체' 보행상태 데이터가 혹시 남아있다면 제거
final_gold_data = final_gold_data[final_gold_data['ped_stat'] != '전체']
print(f"'전체' 데이터 제거 후 레코드 수: {final_gold_data.shape[0]:,}")



전체 데이터 예측 중...
최종 골드 테이블 구성 중...
'전체' 데이터 제거 후 레코드 수: 5,070


In [0]:
# 7. 골드 테이블 저장
# -----------------------------------------------------------------------------
print("골드 테이블 저장 중...")
gold_table = spark.createDataFrame(final_gold_data)

# 테이블 저장 (스키마 덮어쓰기 옵션 포함)
gold_table.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    "1dt_team5_databricks_traffic.gold_managed.gold_ped_acid_pred"
)

골드 테이블 저장 중...
