In [0]:
import mlflow
import mlflow.spark
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, when, expr, lit, udf
from pyspark.sql.types import StringType
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# MLflow 실험 설정 - Databricks 경로 형식으로 수정
mlflow.set_experiment("/Users/jazzyrain722@naver.com/Customer_Segmentation_Discount_Sensitivity")

# 데이터를 Delta 테이블로 저장한 경로
delta_path = "/delta/customer_rfm_features"

# 전처리된 RFM 데이터 로드
data = spark.read.format("delta").load(delta_path)

# 필요한 컬럼들에 대해 결측치 제거
data_filtered = data.na.drop()

# avg_purchase_amount 컬럼을 계산 (구매 횟수가 0이 아닌 경우에만 계산)
data_with_avg = data_filtered.withColumn(
    "avg_purchase_amount", 
    when(col("frequency") > 0, col("monetary") / col("frequency")).otherwise(0)
)

# 데이터에 할인 관련 컬럼이 없는 경우 생성 (예시 데이터)
if "discount_used" not in data_with_avg.columns:
    # 가정: 평균 구매액이 높은 고객은 할인을 적게 사용했을 가능성
    data_with_avg = data_with_avg.withColumn(
        "discount_used",
        when(col("avg_purchase_amount") > 100, 0).otherwise(1)
    )

if "discount_sensitive" not in data_with_avg.columns:
    # 가정: 빈번하게 구매하면서 할인을 사용한 고객은 할인에 민감
    data_with_avg = data_with_avg.withColumn(
        "discount_sensitive",
        when((col("frequency") > 3) & (col("discount_used") == 1), 1).otherwise(0)
    )

# 모델에 필요한 피처 컬럼 선택 (RFM 및 평균구매액)
features_df = data_with_avg.select(
    "customer_id", "recency", "frequency", "monetary", "avg_purchase_amount",
    "discount_used", "discount_sensitive"
)

# 피처 벡터화
assembler = VectorAssembler(
    inputCols=["recency", "frequency", "monetary", "avg_purchase_amount"],
    outputCol="features"
)
assembled_data = assembler.transform(features_df)

# 스케일링 추가 (K-means는 거리 기반이므로 스케일링이 중요)
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)

# 영어 폰트 설정
plt.rcParams['font.family'] = 'DejaVu Sans'

# K-means 최적 k 찾기 (Elbow Method)
with mlflow.start_run(run_name="find_optimal_k"):
    cost = []
    ks = list(range(2, 10))
    
    for k in ks:
        kmeans = KMeans(featuresCol='scaled_features', k=k, seed=42)
        model = kmeans.fit(scaled_data)
        cost.append(model.summary.trainingCost)
        
        # 로깅
        mlflow.log_metric(f"WSSSE_k{k}", model.summary.trainingCost)
    
    # 그래프 출력
    plt.figure(figsize=(10, 6))
    plt.plot(ks, cost, 'bx-')
    plt.xlabel('k')
    plt.ylabel('WSSSE')
    plt.title('Elbow Method For Optimal k')
    plt.savefig("elbow_method.png")
    mlflow.log_artifact("elbow_method.png")
    
    # 최적 k 자동 계산 (급격한 감소가 완만해지는 지점)
    diff = np.diff(cost)
    diff2 = np.diff(diff)
    optimal_k = ks[np.argmax(diff2) + 1]
    
    print(f"Optimal k value is {optimal_k}")
    mlflow.log_param("optimal_k", optimal_k)

# 최적 k로 K-means 클러스터링 수행
with mlflow.start_run(run_name="customer_segmentation"):
    kmeans = KMeans(featuresCol='scaled_features', k=optimal_k, seed=42)
    model = kmeans.fit(scaled_data)
    
    # 클러스터 예측 결과 추가
    clustered_data = model.transform(scaled_data)
    
    # 클러스터 중심점 확인
    centers = model.clusterCenters()
    center_df = pd.DataFrame(centers, columns=["recency", "frequency", "monetary", "avg_purchase_amount"])
    print("Cluster Centers:")
    print(center_df)
    
    # 클러스터 특성 분석 및 세그먼트 이름 부여
    # 클러스터 특성 분석을 위한 집계
    cluster_analysis = clustered_data.groupBy("prediction").agg(
        {"recency": "avg", "frequency": "avg", "monetary": "avg", "avg_purchase_amount": "avg"}
    ).orderBy("prediction")
    
    # Spark DataFrame을 Pandas로 변환하여 세그먼트 정의를 더 쉽게 함
    cluster_pd = cluster_analysis.toPandas()
    
    # 세그먼트 이름을 정의하는 함수 (영어로 변경)
    def define_segment(row):
        # 이 로직은 데이터의 특성에 맞게 조정해야 함
        recency = row["avg(recency)"]
        frequency = row["avg(frequency)"]
        monetary = row["avg(monetary)"]
        
        if frequency > cluster_pd["avg(frequency)"].mean() and monetary > cluster_pd["avg(monetary)"].mean():
            if recency < cluster_pd["avg(recency)"].mean():
                return "Loyal Customer"  # 충성 고객
            else:
                return "Dormant Customer"  # 휴면 고객
        elif frequency <= 1:
            return "One-time Buyer"  # 일회성 구매 고객
        elif monetary > cluster_pd["avg(monetary)"].mean() and frequency <= cluster_pd["avg(frequency)"].mean():
            return "High-value Occasional"  # 고가치 간헐적 고객
        elif recency < cluster_pd["avg(recency)"].mean() and frequency <= cluster_pd["avg(frequency)"].mean():
            return "Potential Growth"  # 잠재 성장 고객
        else:
            return "Regular Customer"  # 일반 고객
    
    # 세그먼트 이름 적용
    cluster_pd["segment"] = cluster_pd.apply(define_segment, axis=1)
    print("Segment Definitions:")
    print(cluster_pd)
    
    # 클러스터 시각화 (2D로 축소하여 시각화)
    from pyspark.ml.feature import PCA
    
    # PCA로 2차원 축소
    pca = PCA(k=2, inputCol="scaled_features", outputCol="pca_features")
    pca_model = pca.fit(clustered_data)
    pca_result = pca_model.transform(clustered_data)
    
    # PCA 결과를 시각화용 Pandas로 변환
    pca_pdf = pca_result.select("prediction", "pca_features").toPandas()
    pca_pdf["pca1"] = pca_pdf["pca_features"].apply(lambda x: float(x[0]))
    pca_pdf["pca2"] = pca_pdf["pca_features"].apply(lambda x: float(x[1]))
    
    # 클러스터별 색상 지정
    plt.figure(figsize=(12, 8))
    for i in range(optimal_k):
        subset = pca_pdf[pca_pdf["prediction"] == i]
        segment_name = cluster_pd.loc[i, 'segment'] if i < len(cluster_pd) else f"Cluster {i}"
        plt.scatter(subset["pca1"], subset["pca2"], label=f"Cluster {i}: {segment_name}")
    
    plt.title("Customer Segments PCA Visualization")
    plt.xlabel("Principal Component 1")
    plt.ylabel("Principal Component 2")
    plt.legend()
    plt.savefig("cluster_visualization.png")
    mlflow.log_artifact("cluster_visualization.png")

    # 세그먼트 매핑 정보를 Spark에 적용
    segment_mapping = {row.prediction: row.segment for i, row in cluster_pd.iterrows()}
    
    # UDF를 사용하여 세그먼트 이름 추가
    def map_segment(cluster_id):
        return segment_mapping.get(cluster_id, "Unknown")
    
    segment_udf = udf(map_segment, StringType())
    segmented_data = clustered_data.withColumn("segment", segment_udf(col("prediction")))
    
    # 로깅
    mlflow.spark.log_model(model, "kmeans_model")
    
    # 세그먼트별 통계 계산
    segment_stats = segmented_data.groupBy("segment").agg(
        {"recency": "avg", "frequency": "avg", "monetary": "avg", 
         "avg_purchase_amount": "avg", "discount_used": "avg", "discount_sensitive": "avg"}
    )
    
    print("Segment Statistics:")
    segment_stats.show()

# 할인 민감도 예측 모델 구축 (오류 수정)
with mlflow.start_run(run_name="discount_sensitivity_prediction"):
    # 훈련/테스트 데이터 분할
    train_data, test_data = segmented_data.randomSplit([0.8, 0.2], seed=42)
    
    # 라벨 값 확인 및 필요시 변환 - 디버깅용
    print("라벨 분포 확인:")
    train_data.groupBy("discount_sensitive").count().orderBy("discount_sensitive").show()
    
    # LogisticRegression 사용 (이진 분류에 더 적합할 수 있음)
    from pyspark.ml.classification import LogisticRegression
    
    lr = LogisticRegression(
        labelCol="discount_sensitive",
        featuresCol="scaled_features",
        predictionCol="rf_prediction",
        probabilityCol="rf_probability",
        rawPredictionCol="rf_rawPrediction",
        maxIter=20,
        regParam=0.05
    )
    
    # 파라미터 그리드 설정 및 교차 검증
    paramGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.01, 0.05, 0.1]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
        .build()
    
    # 평가자 수정 - MulticlassClassificationEvaluator 사용
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    
    evaluator = MulticlassClassificationEvaluator(
        labelCol="discount_sensitive", 
        predictionCol="rf_prediction",
        metricName="accuracy"
    )
    
    cv = CrossValidator(
        estimator=lr,
        estimatorParamMaps=paramGrid,
        evaluator=evaluator,
        numFolds=3
    )
    
    # 모델 훈련
    cv_model = cv.fit(train_data)
    best_model = cv_model.bestModel
    
    # 테스트 데이터로 평가
    predictions = best_model.transform(test_data)
    accuracy = evaluator.evaluate(predictions)
    print(f"Model Accuracy: {accuracy}")
    
    # ROC AUC도 계산 (필요시)
    binary_evaluator = BinaryClassificationEvaluator(
        labelCol="discount_sensitive",
        rawPredictionCol="rf_probability",  # probability 컬럼 사용
        metricName="areaUnderROC"
    )
    try:
        auc = binary_evaluator.evaluate(predictions)
        print(f"Area under ROC: {auc}")
    except Exception as e:
        print(f"ROC AUC 계산 오류: {e}")
        auc = 0.0
    
    # 로깅
    mlflow.spark.log_model(best_model, "lr_model")
    mlflow.log_metric("accuracy", accuracy)
    if auc > 0:
        mlflow.log_metric("auc", auc)
    
    # 피처 중요도 확인 (로지스틱 회귀에서는 coefficients 사용)
    coefficients = best_model.coefficients
    feature_names = ["recency", "frequency", "monetary", "avg_purchase_amount"]
    
    # 계수의 절대값을 기준으로 중요도 계산
    coef_abs = np.abs(coefficients.toArray())
    importance_df = pd.DataFrame({
        'Feature': feature_names,
        'Coefficient': coefficients.toArray(),
        'Importance': coef_abs
    })
    importance_df = importance_df.sort_values('Importance', ascending=False)
    
    plt.figure(figsize=(10, 6))
    # 계수의 절대값 기준 중요도 시각화
    plt.barh(importance_df['Feature'], importance_df['Importance'])
    plt.xlabel('Coefficient Magnitude (Absolute Value)')
    plt.title('Feature Importance for Discount Sensitivity (Logistic Regression)')
    plt.tight_layout()
    plt.savefig("feature_importance.png")
    mlflow.log_artifact("feature_importance.png")
    
    # 계수 자체도 시각화 (부호 포함)
    plt.figure(figsize=(10, 6))
    bars = plt.barh(importance_df['Feature'], importance_df['Coefficient'])
    # 음수와 양수 계수에 다른 색상 적용
    for i, bar in enumerate(bars):
        if importance_df['Coefficient'].iloc[i] < 0:
            bar.set_color('red')
        else:
            bar.set_color('green')
    plt.axvline(x=0, color='black', linestyle='-', alpha=0.3)
    plt.xlabel('Coefficient Value')
    plt.title('Feature Coefficients (Logistic Regression)')
    plt.tight_layout()
    plt.savefig("feature_coefficients.png")
    mlflow.log_artifact("feature_coefficients.png")
    
    # 디버깅 - 확률 벡터의 형태 확인
    predictions.select("rf_probability").limit(5).show(truncate=False)
    
    # ROC 커브 시각화 - 수정된 접근 방식
    # 확률 벡터의 구조를 확인하고 적절히 처리
    from pyspark.sql.functions import udf
    from pyspark.sql.types import DoubleType
    
    # 확률 벡터에서 클래스 1의 확률 추출하는 UDF
    def get_class1_probability(prob_vector):
        # 벡터 길이가 2인 경우 (일반적인 이진 분류)
        if len(prob_vector) == 2:
            return float(prob_vector[1])
        # 벡터 길이가 1인 경우 (일부 모델에서 발생)
        elif len(prob_vector) == 1:
            return float(prob_vector[0])
        else:
            return 0.5  # 기본값
    
    get_prob_udf = udf(get_class1_probability, DoubleType())
    
    # 확률 추출하여 새 컬럼 생성
    predictions_with_prob = predictions.withColumn("class1_probability", get_prob_udf("rf_probability"))
    
    # Pandas로 변환
    pred_df = predictions_with_prob.select("class1_probability", "discount_sensitive").toPandas()
    y_true = pred_df["discount_sensitive"].values
    y_score = pred_df["class1_probability"].values
    
    # ROC 커브 계산
    fpr, tpr, _ = roc_curve(y_true, y_score)
    roc_auc = sklearn_auc(fpr, tpr)
    
    # ROC 그래프 그리기
    plt.figure(figsize=(10, 8))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.3f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")
    plt.savefig("roc_curve.png")
    mlflow.log_artifact("roc_curve.png")
    
    # 혼동 행렬 시각화 - 변환된 예측과 라벨 사용
    pred_df_cm = predictions.select("rf_prediction", "discount_sensitive").toPandas()
    y_true_cm = pred_df_cm["discount_sensitive"].values
    y_pred_cm = pred_df_cm["rf_prediction"].values
    
    # 혼동 행렬 계산
    cm = confusion_matrix(y_true_cm, y_pred_cm)
    
    # 혼동 행렬 그래프 그리기
    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.colorbar()
    tick_marks = np.arange(2)
    plt.xticks(tick_marks, ['Not Sensitive (0)', 'Sensitive (1)'])
    plt.yticks(tick_marks, ['Not Sensitive (0)', 'Sensitive (1)'])
    
    # 값 표시
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], 'd'),
                     ha="center", va="center",
                     color="white" if cm[i, j] > thresh else "black")
    
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.savefig("confusion_matrix.png")
    mlflow.log_artifact("confusion_matrix.png")
    
    # 정확도, 정밀도, 재현율 계산
    accuracy = accuracy_score(y_true_cm, y_pred_cm)
    precision = precision_score(y_true_cm, y_pred_cm, zero_division=0)
    recall = recall_score(y_true_cm, y_pred_cm, zero_division=0)
    f1 = f1_score(y_true_cm, y_pred_cm, zero_division=0)
    
    # 메트릭 로깅
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    mlflow.log_metric("f1", f1)
    
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    
    # 세그먼트별 할인 민감도 및 피쳐 영향 분석
    # 앞서 생성한 class1_probability 컬럼 사용
    segment_impact = predictions_with_prob.groupBy("segment").agg(
        {"class1_probability": "avg", "rf_prediction": "avg", "discount_sensitive": "avg"}
    )
    print("Segment Discount Sensitivity:")
    segment_impact.show()
    
    # 세그먼트별 예측 정확도 시각화
    segment_accuracy = predictions_with_prob.withColumn(
        "correct_prediction", 
        when(col("rf_prediction") == col("discount_sensitive"), 1).otherwise(0)
    ).groupBy("segment").agg(
        {"correct_prediction": "avg", "discount_sensitive": "avg", "prediction": "avg"}
    ).withColumnRenamed("avg(correct_prediction)", "accuracy")
    
    # Pandas로 변환
    segment_acc_pd = segment_accuracy.toPandas()
    
    # 세그먼트별 시각화 그래프
    plt.figure(figsize=(12, 6))
    
    # 세그먼트별 정확도
    plt.subplot(1, 2, 1)
    plt.bar(segment_acc_pd["segment"], segment_acc_pd["accuracy"], color='skyblue')
    plt.xlabel('Customer Segment')
    plt.ylabel('Prediction Accuracy')
    plt.title('Discount Sensitivity Prediction Accuracy by Segment')
    plt.xticks(rotation=45, ha='right')
    
    # 세그먼트별 할인 민감도 비율
    plt.subplot(1, 2, 2)
    plt.bar(segment_acc_pd["segment"], segment_acc_pd["avg(discount_sensitive)"], color='orange')
    plt.xlabel('Customer Segment')
    plt.ylabel('Discount Sensitivity Ratio')
    plt.title('Actual Discount Sensitivity by Segment')
    plt.xticks(rotation=45, ha='right')
    
    plt.tight_layout()
    plt.savefig("segment_accuracy.png")
    mlflow.log_artifact("segment_accuracy.png")
    
    # 모델 성능 종합 대시보드 생성
    plt.figure(figsize=(16, 10))
    
    # 1. 혼동 행렬
    plt.subplot(2, 3, 1)
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    tick_marks = np.arange(2)
    plt.xticks(tick_marks, ['Not Sensitive', 'Sensitive'])
    plt.yticks(tick_marks, ['Not Sensitive', 'Sensitive'])
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], 'd'),
                     ha="center", va="center",
                     color="white" if cm[i, j] > thresh else "black")
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    
    # 2. ROC 커브
    plt.subplot(2, 3, 2)
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.3f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend(loc="lower right")
    
    # 3. 피처 중요도
    plt.subplot(2, 3, 3)
    bars = plt.barh(importance_df['Feature'], importance_df['Coefficient'])
    # 음수와 양수 계수에 다른 색상 적용
    for i, bar in enumerate(bars):
        if importance_df['Coefficient'].iloc[i] < 0:
            bar.set_color('red')
        else:
            bar.set_color('green')
    plt.axvline(x=0, color='black', linestyle='-', alpha=0.3)
    plt.xlabel('Coefficient')
    plt.title('Feature Coefficients')
    
    # 4. 세그먼트별 정확도
    plt.subplot(2, 3, 4)
    plt.bar(segment_acc_pd["segment"], segment_acc_pd["accuracy"], color='skyblue')
    plt.xlabel('Customer Segment')
    plt.ylabel('Accuracy')
    plt.title('Prediction Accuracy by Segment')
    plt.xticks(rotation=45, ha='right')
    
    # 5. 세그먼트별 할인 민감도
    plt.subplot(2, 3, 5)
    plt.bar(segment_acc_pd["segment"], segment_acc_pd["avg(discount_sensitive)"], color='orange')
    plt.xlabel('Customer Segment')
    plt.ylabel('Sensitivity Ratio')
    plt.title('Discount Sensitivity by Segment')
    plt.xticks(rotation=45, ha='right')
    
    # 6. 성능 메트릭
    plt.subplot(2, 3, 6)
    metrics = [accuracy, precision, recall, f1, roc_auc]
    metric_names = ['Accuracy', 'Precision', 'Recall', 'F1', 'AUC']
    plt.bar(metric_names, metrics, color='green')
    plt.ylim([0, 1.1])
    plt.ylabel('Score')
    plt.title('Model Performance Metrics')
    
    plt.tight_layout()
    plt.savefig("model_performance_dashboard.png")
    mlflow.log_artifact("model_performance_dashboard.png")
    print("Model performance visualization completed.")

# 최종 결과 저장 (영어로 수정)
final_output = segmented_data.withColumn(
    "discount_recommendation", 
    when(col("segment") == "Loyal Customer", "Low Discount (Retention Strategy)")
    .when(col("segment") == "Potential Growth", "Medium Discount (Growth Promotion)")
    .when(col("segment") == "One-time Buyer", "High Discount (Repurchase Incentive)")
    .when(col("segment") == "Dormant Customer", "High Discount + Personalization (Reactivation)")
    .otherwise("Medium Discount (General)")
)

# Delta 테이블로 저장
final_output.write.format("delta").mode("overwrite").save("/delta/customer_segments_with_recommendations")

# 한국어로 모델 결과 요약 출력
print("\n" + "="*80)
print("고객 세그먼트 및 할인 민감도 분석 결과 요약")
print("="*80)

# 모델 성능 요약
print("\n[모델 성능 요약]")
print(f"• 정확도(Accuracy): {accuracy:.4f} - 모델이 고객의 할인 민감도를 예측한 정확도입니다.")
print(f"• 정밀도(Precision): {precision:.4f} - 모델이 '할인에 민감하다'고 예측한 고객 중 실제로 할인에 민감한 비율입니다.")
print(f"• 재현율(Recall): {recall:.4f} - 실제 할인에 민감한 고객 중 모델이 정확히 식별한 비율입니다.")
print(f"• F1 점수: {f1:.4f} - 정밀도와 재현율의 조화평균으로, 모델의 전반적인 성능을 나타냅니다.")
if auc > 0:
    print(f"• AUC: {auc:.4f} - 모델의 분류 성능을 나타내는 지표로, 1에 가까울수록 좋은 성능입니다.")

# 피처 중요도 요약
print("\n[피처 중요도 분석]")
for index, row in importance_df.head(4).iterrows():
    feature = row['Feature']
    coef = row['Coefficient']
    importance = row['Importance']
    effect = "증가" if coef > 0 else "감소"
    
    # 피처 이름 한글화
    if feature == "recency":
        feature_kr = "최근성(최근 구매 일자)"
    elif feature == "frequency":
        feature_kr = "구매 빈도"
    elif feature == "monetary":
        feature_kr = "총 구매 금액"
    elif feature == "avg_purchase_amount":
        feature_kr = "평균 구매 금액"
    else:
        feature_kr = feature
        
    print(f"• {feature_kr}: 계수 = {coef:.4f}, 중요도 = {importance:.4f}")
    print(f"  → 이 값이 증가하면 할인 민감도가 {effect}합니다.")

# 세그먼트별 할인 민감도 요약
print("\n[고객 세그먼트별 할인 민감도]")
segment_impact_pd = segment_impact.toPandas()
for index, row in segment_impact_pd.iterrows():
    segment = row['segment']
    sensitivity = row['avg(discount_sensitive)']
    prediction = row['avg(rf_prediction)']
    
    # 세그먼트 이름 한글화
    if segment == "Loyal Customer":
        segment_kr = "충성 고객"
    elif segment == "Potential Growth":
        segment_kr = "잠재 성장 고객"
    elif segment == "One-time Buyer":
        segment_kr = "일회성 구매 고객"
    elif segment == "Dormant Customer":
        segment_kr = "휴면 고객"
    elif segment == "High-value Occasional":
        segment_kr = "고가치 간헐적 고객"
    elif segment == "Regular Customer":
        segment_kr = "일반 고객"
    else:
        segment_kr = segment
        
    sensitivity_level = "높음" if sensitivity > 0.5 else "낮음"
    print(f"• {segment_kr}: 할인 민감도 = {sensitivity:.4f} ({sensitivity_level})")
    print(f"  → 모델 예측 민감도 = {prediction:.4f}")

# 비즈니스 인사이트 및 결론
print("\n[비즈니스 인사이트 및 결론]")
print("• 이 분석은 고객 데이터를 RFM(Recency, Frequency, Monetary) 기준으로 세분화하고,")
print("  각 세그먼트별 할인 민감도를 예측하는 머신러닝 모델을 구축했습니다.")
print("• 이를 통해 마케팅팀은 고객 세그먼트별로 최적화된 할인 전략을 수립할 수 있습니다.")
print("• 예를 들어, 할인에 민감한 세그먼트에는 더 공격적인 할인 프로모션을,")
print("  할인에 덜 민감한 충성 고객에게는 할인보다 다른 혜택을 제공하는 전략이 효과적일 수 있습니다.")
print("• 이 모델은 데이터 기반의 개인화된 마케팅 전략 수립에 도움을 줄 수 있으며,")
print("  궁극적으로 고객 가치 극대화와 마케팅 비용 최적화에 기여할 수 있습니다.")

print("\n[모델 한계 및 향후 개선 방향]")
print("• 현재 모델은 제한된 피처를 사용하고 있으며, 할인 민감도에 대한 실제 행동 데이터가 부족합니다.")
print("• 향후 A/B 테스트를 통한 실제 할인 반응 데이터를 수집하여 모델을 개선할 수 있습니다.")
print("• 또한 계절성, 제품 카테고리별 선호도, 인구통계학적 정보 등 추가 피처를 모델에 포함하면")
print("  예측 정확도를 더욱 향상시킬 수 있을 것입니다.")

print("\n고객 세그먼트 및 할인 민감도 분석이 완료되었습니다.")
print("결과는 '/delta/customer_segments_with_recommendations'에 저장되었습니다.")

In [0]:
# 디버깅을 위한 임시 코드 - 실행 전에 segment_impact DataFrame 확인
print("segment_impact 컬럼 확인:")
segment_impact.printSchema()
segment_impact.show(5, truncate=False)

# 디버깅용 파일로 저장
try:
    import sys
    
    # 파일로 결과 저장 (인코딩 지정)
    with open("/dbfs/FileStore/analysis_results.txt", "w", encoding="utf-8") as f:
        f.write("="*80 + "\n")
        f.write("고객 세그먼트 및 할인 민감도 분석 결과 요약\n")
        f.write("="*80 + "\n")
        
        # 모델 성능 요약
        f.write("\n[모델 성능 요약]\n")
        f.write(f"• 정확도(Accuracy): {accuracy:.4f} - 모델이 고객의 할인 민감도를 예측한 정확도입니다.\n")
        f.write(f"• 정밀도(Precision): {precision:.4f} - 모델이 '할인에 민감하다'고 예측한 고객 중 실제로 할인에 민감한 비율입니다.\n")
        f.write(f"• 재현율(Recall): {recall:.4f} - 실제 할인에 민감한 고객 중 모델이 정확히 식별한 비율입니다.\n")
        f.write(f"• F1 점수: {f1:.4f} - 정밀도와 재현율의 조화평균으로, 모델의 전반적인 성능을 나타냅니다.\n")
        
        # 세그먼트별 할인 민감도 요약 - 오류 방지용 try/except 구문 사용
        f.write("\n[고객 세그먼트별 할인 민감도]\n")
        try:
            # 안전하게 DataFrame 변환
            segment_impact_pd = segment_impact.toPandas()
            print(f"segment_impact_pd 변환 성공: {len(segment_impact_pd)} 행")
            
            # 컬럼명 확인 및 출력
            print(f"segment_impact_pd 컬럼: {segment_impact_pd.columns.tolist()}")
            
            for index, row in segment_impact_pd.iterrows():
                # 컬럼 이름이 다를 수 있으므로 안전하게 접근
                segment = row.get('segment', 'Unknown')
                
                # 컬럼명 확인 및 예외 처리
                sensitivity_col = 'avg(discount_sensitive)' if 'avg(discount_sensitive)' in row else 'avg(discount_sensitive)'
                prediction_col = 'avg(rf_prediction)' if 'avg(rf_prediction)' in row else 'avg(rf_prediction)'
                
                sensitivity = row.get(sensitivity_col, 0.0)
                prediction = row.get(prediction_col, 0.0)
                
                # 세그먼트 이름 한글화
                segment_kr = segment
                if segment == "Loyal Customer": segment_kr = "충성 고객"
                elif segment == "Potential Growth": segment_kr = "잠재 성장 고객"
                elif segment == "One-time Buyer": segment_kr = "일회성 구매 고객"
                elif segment == "Dormant Customer": segment_kr = "휴면 고객"
                elif segment == "High-value Occasional": segment_kr = "고가치 간헐적 고객"
                elif segment == "Regular Customer": segment_kr = "일반 고객"
                
                sensitivity_level = "높음" if sensitivity > 0.5 else "낮음"
                f.write(f"• {segment_kr}: 할인 민감도 = {sensitivity:.4f} ({sensitivity_level})\n")
                f.write(f"  → 모델 예측 민감도 = {prediction:.4f}\n")
        except Exception as e:
            f.write(f"세그먼트 데이터 처리 중 오류 발생: {str(e)}\n")
        
        # 비즈니스 인사이트 및 결론
        f.write("\n[비즈니스 인사이트 및 결론]\n")
        f.write("• 이 분석은 고객 데이터를 RFM(Recency, Frequency, Monetary) 기준으로 세분화하고,\n")
        f.write("  각 세그먼트별 할인 민감도를 예측하는 머신러닝 모델을 구축했습니다.\n")
        f.write("• 이를 통해 마케팅팀은 고객 세그먼트별로 최적화된 할인 전략을 수립할 수 있습니다.\n")
        f.write("• 예를 들어, 할인에 민감한 세그먼트에는 더 공격적인 할인 프로모션을,\n")
        f.write("  할인에 덜 민감한 충성 고객에게는 할인보다 다른 혜택을 제공하는 전략이 효과적일 수 있습니다.\n")
        
    print("분석 결과가 '/dbfs/FileStore/analysis_results.txt'에 저장되었습니다.")
    
    # Databricks 디스플레이 사용
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    
    # 결과를 DataFrame으로 변환하여 표시
    with open("/dbfs/FileStore/analysis_results.txt", "r", encoding="utf-8") as f:
        result_text = f.read()
    
    result_df = spark.createDataFrame([(result_text,)], ["analysis_results"])
    display(result_df)
    
except Exception as e:
    print(f"결과 저장 중 오류 발생: {str(e)}")

# 출력 버퍼 강제 비우기
import sys
sys.stdout.flush()

print("고객 세그먼트 및 할인 민감도 분석이 완료되었습니다.")
print("결과는 '/delta/customer_segments_with_recommendations'에 저장되었습니다.")