In [2]:
!pip install --upgrade pip
!pip install "dask[complete]"
!pip install dask-ml           
!pip install dask-xgboost      
!pip install pyarrow           





Collecting dask[complete]
  Downloading dask-2025.11.0-py3-none-any.whl.metadata (3.8 kB)
Collecting cloudpickle>=3.0.0 (from dask[complete])
  Downloading cloudpickle-3.1.2-py3-none-any.whl.metadata (7.1 kB)
Collecting partd>=1.4.0 (from dask[complete])
  Downloading partd-1.4.2-py3-none-any.whl.metadata (4.6 kB)
Collecting toolz>=0.10.0 (from dask[complete])
  Downloading toolz-1.1.0-py3-none-any.whl.metadata (5.1 kB)
Collecting importlib_metadata>=4.13.0 (from dask[complete])
  Downloading importlib_metadata-8.7.0-py3-none-any.whl.metadata (4.8 kB)
Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.4.5-cp310-cp310-win_amd64.whl.metadata (3.9 kB)
Collecting zipp>=3.20 (from importlib_metadata>=4.13.0->dask[complete])
  Downloading zipp-3.23.0-py3-none-any.whl.metadata (3.6 kB)
Collecting locket (from partd>=1.4.0->dask[complete])
  Downloading locket-1.0.0-py2.py3-none-any.whl.metadata (2.8 kB)
Collecting bokeh>=3.1.0 (from dask[complete])
  Downloading bokeh-3.8.1-py3-n



Collecting dask-ml
  Downloading dask_ml-2025.1.0-py3-none-any.whl.metadata (6.0 kB)
Collecting dask-glm>=0.2.0 (from dask-ml)
  Downloading dask_glm-0.3.2-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting multipledispatch>=0.4.9 (from dask-ml)
  Downloading multipledispatch-1.0.0-py3-none-any.whl.metadata (3.8 kB)
Collecting sparse>=0.7.0 (from dask-glm>=0.2.0->dask-ml)
  Downloading sparse-0.17.0-py2.py3-none-any.whl.metadata (5.3 kB)
Downloading dask_ml-2025.1.0-py3-none-any.whl (149 kB)
Downloading dask_glm-0.3.2-py2.py3-none-any.whl (13 kB)
Downloading multipledispatch-1.0.0-py3-none-any.whl (12 kB)
Downloading sparse-0.17.0-py2.py3-none-any.whl (259 kB)
Installing collected packages: multipledispatch, sparse, dask-glm, dask-ml

   ---------- ----------------------------- 1/4 [sparse]
   ---------- ----------------------------- 1/4 [sparse]
   ---------- ----------------------------- 1/4 [sparse]
   ---------- ----------------------------- 1/4 [sparse]
   ---------- --------------



Collecting dask-xgboost




  Downloading dask_xgboost-0.2.0-py2.py3-none-any.whl.metadata (3.0 kB)
Collecting xgboost<=0.90 (from dask-xgboost)
  Downloading xgboost-0.90-py2.py3-none-win_amd64.whl.metadata (3.8 kB)
Downloading dask_xgboost-0.2.0-py2.py3-none-any.whl (14 kB)
Downloading xgboost-0.90-py2.py3-none-win_amd64.whl (18.3 MB)
   ---------------------------------------- 0.0/18.3 MB ? eta -:--:--
   ---------------------- ----------------- 10.2/18.3 MB 57.9 MB/s eta 0:00:01
   -------------------------------------- - 17.8/18.3 MB 56.1 MB/s eta 0:00:01
   ---------------------------------------- 18.3/18.3 MB 35.0 MB/s  0:00:00
Installing collected packages: xgboost, dask-xgboost

   ---------------------------------------- 0/2 [xgboost]
   ---------------------------------------- 0/2 [xgboost]
   -------------------- ------------------- 1/2 [dask-xgboost]
   ---------------------------------------- 2/2 [dask-xgboost]

Successfully installed dask-xgboost-0.2.0 xgboost-0.90




In [3]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask_ml.cluster
import dask_ml.preprocessing
from dask_ml.model_selection import train_test_split
import xgboost as xgb
from sklearn.metrics import accuracy_score, roc_auc_score, classification_report
import dask.distributed
import warnings

# 경고 메시지 무시
warnings.filterwarnings('ignore')


In [4]:
# --- 0. Dask 클러스터 시작 ---
try:
    client = dask.distributed.Client()
    print(f"Dask 클러스터(Scheduler)가 시작되었습니다: {client.dashboard_link}")
except Exception as e:
    print(f"Dask 클러스터 시작 실패: {e}. 로컬 스레드로 진행합니다.")

# --- 하이퍼파라미터 ---
# (중요!) 20%에서 메모리가 부족했으므로, 5% (0.05)로 줄여서 테스트합니다.
SAMPLE_FRAC = 0.05 
K_BEERS = 6
K_USERS = 8
THRESHOLD = 0.5   # 'Top Pick' 정의 (평균보다 +0.5점)


--- 1. 데이터 로드, 필터링, 샘플링 (FRAC=0.2) ---


In [7]:
# --- 1. Dask로 로드, 필터링, 샘플링 ---
print(f"\n--- 1. 데이터 로드, 필터링, 샘플링 (FRAC={SAMPLE_FRAC}) ---")
try:
    ddf_master = dd.read_parquet('df_master_preprocessed.parquet')
except Exception as e:
    print(f"Parquet 로드 실패: {e}. 'reviews.csv'에서 Dask로 읽기를 시도합니다.")
    # (Parquet 파일이 없거나 EDA 노트북 피처가 없는 경우, 데모용으로 리뷰만 읽음)
    ddf_master = dd.read_csv(
        'reviews.csv', 
        usecols=['username', 'date', 'score', 'beer_id', 'smell', 'taste', 'feel', 'abv', 'style', 'country_brewery']
    )

# 1-1. 2013, 2017년 필터링 (Dask 문법)
ddf_master['datetime'] = dd.to_datetime(ddf_master['date'], errors='coerce')
ddf_master['year'] = ddf_master['datetime'].dt.year
ddf_filtered = ddf_master[ddf_master['year'].isin([2013, 2017])].copy()

# 1-2. (중요) 샘플링
ddf_sample = ddf_filtered.sample(frac=SAMPLE_FRAC, random_state=42)

# 1-3. (중요) Dask 메모리 최적화: .persist()
# .persist()는 지금까지의 계산(필터링, 샘플링)을 "실행"하고, 
# 그 결과(약 42만 행)를 메모리에 "고정"시켜 다음 groupby/merge가 빨라지게 함
print("Dask 데이터 필터링 및 샘플링 실행 중...")
ddf_sample = ddf_sample.persist()
print(f"샘플링 완료. 최종 학습 데이터 크기: {len(ddf_sample)} 행")

# (EDA 노트북의 style_group, geo_group이 없다고 가정하고 Dask로 재생성 - 데모용)
def group_style(style):
    if 'IPA' in str(style): return 'IPA'
    if 'Stout' in str(style): return 'Stout'
    if 'Ale' in str(style): return 'Ale'
    return 'Other'
ddf_sample['style_group'] = ddf_sample['style'].apply(group_style, meta=('style', 'object'))

def group_country(country):
    if country == 'US': return 'US'
    if country in ['DE', 'GB', 'BE']: return 'Europe'
    return 'Other'
ddf_sample['geo_group'] = ddf_sample['country_brewery'].apply(group_country, meta=('country_brewery', 'object'))


--- 1. 데이터 로드, 필터링, 샘플링 (FRAC=0.2) ---
Dask 데이터 필터링 및 샘플링 실행 중...


KilledWorker: Attempted to run task ('read_parquet-fused-sample-56688d942e510fa97d8c9b546094681d', 0) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:54127. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

In [None]:
# --- 2. Dask로 Beer 클러스터링 (새로 생성) ---
print(f"\n--- 2. Dask Beer 클러스터링 (K={K_BEERS}) ---")
beer_features_df = ddf_sample[['beer_id', 'style_group', 'geo_group', 'abv']].drop_duplicates(subset=['beer_id'])
beer_features_df = beer_features_df.dropna().persist() # (dropna 후 고정)

# Dask-ML의 K-Means는 숫자형만 받으므로 범주형을 코드로 변환
beer_features_df['style_group_code'] = beer_features_df['style_group'].astype('category').cat.codes
beer_features_df['geo_group_code'] = beer_features_df['geo_group'].astype('category').cat.codes

kmeans_beer = dask_ml.cluster.KMeans(n_clusters=K_BEERS, random_state=42)
scaler_beer = dask_ml.preprocessing.StandardScaler()
beer_features_processed = scaler_beer.fit_transform(beer_features_df[['abv', 'style_group_code', 'geo_group_code']])

print("Beer K-Means 학습 중...")
beer_features_df['beer_cluster'] = kmeans_beer.fit_predict(beer_features_processed)
beer_features_df = beer_features_df.persist() # (최종 beer 클러스터 결과 고정)

In [None]:
# --- 2. Dask로 Beer 클러스터링 (새로 생성) ---
print(f"\n--- 2. Dask Beer 클러스터링 (K={K_BEERS}) ---")
beer_features_df = ddf_sample[['beer_id', 'style_group', 'geo_group', 'abv']].drop_duplicates(subset=['beer_id'])
beer_features_df = beer_features_df.dropna().persist() # (dropna 후 고정)

# Dask-ML의 K-Means는 숫자형만 받으므로 범주형을 코드로 변환
beer_features_df['style_group_code'] = beer_features_df['style_group'].astype('category').cat.codes
beer_features_df['geo_group_code'] = beer_features_df['geo_group'].astype('category').cat.codes

kmeans_beer = dask_ml.cluster.KMeans(n_clusters=K_BEERS, random_state=42)
scaler_beer = dask_ml.preprocessing.StandardScaler()
beer_features_processed = scaler_beer.fit_transform(beer_features_df[['abv', 'style_group_code', 'geo_group_code']])

print("Beer K-Means 학습 중...")
beer_features_df['beer_cluster'] = kmeans_beer.fit_predict(beer_features_processed)
beer_features_df = beer_features_df.persist() # (최종 beer 클러스터 결과 고정)

In [None]:
# --- 3. Dask로 User 클러스터링 (새로 생성) ---
print(f"\n--- 3. Dask User 클러스터링 (K={K_USERS}) ---")
# (취향 벡터 생성)
user_style_affinity = ddf_sample.pivot_table(index='username', columns='style_group', values='score', aggfunc='count').fillna(0)
user_style_affinity = user_style_affinity.div(user_style_affinity.sum(axis=1), axis=0).fillna(0)

user_numeric_features = ddf_sample.groupby('username').agg(
    user_avg_score=('score', 'mean'),
    user_avg_abv=('abv', 'mean'),
    user_avg_smell=('smell', 'mean')
).fillna(0)

user_profile_df = dd.concat([user_numeric_features, user_style_affinity], axis=1).fillna(0).persist() # (유저 프로필 고정)

kmeans_user = dask_ml.cluster.KMeans(n_clusters=K_USERS, random_state=42)
scaler_user = dask_ml.preprocessing.StandardScaler()
user_features_processed = scaler_user.fit_transform(user_profile_df)

print("User K-Means 학습 중...")
user_profile_df['user_cluster'] = kmeans_user.fit_predict(user_features_processed)
user_profile_df = user_profile_df.persist() # (최종 user 클러스터 결과 고정)

In [None]:
# --- 4. 최종 학습 데이터셋 생성 (Dask Merge & Target) ---
print("\n--- 4. 최종 학습 데이터셋 생성 ---")
df_model = dd.merge(ddf_sample, beer_features_df[['beer_cluster']], on='beer_id', how='left')
df_model = dd.merge(df_model, user_profile_df[['user_cluster', 'user_mean_score']], on='username', how='left')

df_model['is_top_pick'] = (df_model['score'] > (df_model['user_mean_score'] + THRESHOLD)).astype(int)
df_model = df_model.dropna().persist() # (최종 모델 데이터 고정)
print(f"Dask 모델 데이터 준비 완료 (메모리 고정). {len(df_model)} 행")

In [None]:
# --- (중요!) 메모리 해제 ---
print("\n--- 5. 중간 데이터 메모리 해제 ---")
try:
    # 더 이상 필요 없는 큰 데이터프레임들을 Dask 클러스터 메모리에서 삭제
    del ddf_master
    del ddf_filtered
    del ddf_sample
    del beer_features_df
    del user_profile_df
    
    # Dask 클러스터를 재시작하여 메모리를 완전히 비움
    client.restart()
    print("메모리 해제 및 Dask 클러스터 재시작 완료.")
    
    # (주의) client.restart()는 df_model도 날려버리므로, 
    # .persist()된 데이터를 다시 로드하거나, del만 사용해야 할 수 있습니다.
    # 여기서는 df_model을 다시 persist 합니다. (더 안전한 방법)
    df_model = df_model.persist() 
    
except Exception as e:
    print(f"메모리 해제 중 오류 (무시): {e}")

In [None]:
# --- 5. Train / Test / Validation 분리 (Dask-ML) ---
print("\n--- 5. Dask Train/Test/Validation 분리 ---")
features_to_use = [
    'abv', 'smell', 'taste', 'feel',
    'style_group', 'geo_group',
    'beer_cluster', 'user_cluster'
]
target = 'is_top_pick'

X = df_model[features_to_use]
y = df_model[target]

# Dask-XGBoost는 'category' Dtype을 인식합니다.
for col in ['style_group', 'geo_group', 'beer_cluster', 'user_cluster']:
    X[col] = X[col].astype('category')

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train_sub, X_val, y_train_sub, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

In [None]:
# --- 6. "아이언맨" (Dask-XGBoost Hybrid) 모델 학습 ---
print("\n--- 6. Dask-XGBoost Hybrid 모델 학습 시작 ---")
hybrid_model_xgb = xgb.dask.DaskXGBClassifier(
    objective='binary:logistic',
    eval_metric='auc',
    n_estimators=1000, 
    learning_rate=0.05,
    max_depth=6,
    n_jobs=-1,
    random_state=42,
    enable_categorical=True 
)

print("XGBoost 학습 계획 수립 중...")
# .persist()로 학습/검증 데이터를 메모리에 고정
X_train_sub = X_train_sub.persist()
y_train_sub = y_train_sub.persist()
X_val = X_val.persist()
y_val = y_val.persist()

hybrid_model_xgb.fit(
    X_train_sub, 
    y_train_sub,
    eval_set=[(X_val, y_val)],
    early_stopping_rounds=30,
    verbose=100
)

In [None]:
# --- 6. "아이언맨" (Dask-XGBoost Hybrid) 모델 학습 ---
print("\n--- 6. Dask-XGBoost Hybrid 모델 학습 시작 ---")
hybrid_model_xgb = xgb.dask.DaskXGBClassifier(
    objective='binary:logistic',
    eval_metric='auc',
    n_estimators=1000, 
    learning_rate=0.05,
    max_depth=6,
    n_jobs=-1,
    random_state=42,
    enable_categorical=True 
)

print("XGBoost 학습 계획 수립 중...")
# .persist()로 학습/검증 데이터를 메모리에 고정
X_train_sub = X_train_sub.persist()
y_train_sub = y_train_sub.persist()
X_val = X_val.persist()
y_val = y_val.persist()

hybrid_model_xgb.fit(
    X_train_sub, 
    y_train_sub,
    eval_set=[(X_val, y_val)],
    early_stopping_rounds=30,
    verbose=100
)

In [None]:
# --- 8. Dask 클러스터 종료 ---
client.close()
print("Dask 클러스터가 종료되었습니다.")