# AWS Glue - Feature Engineering Job

**功能**: 基于清洗数据生成机器学习特征

**输入**: cleaned_customer_base, cleaned_customer_behavior
**输出**: customer_features (包含所有计算特征)

**特征包括**:
- RFM分析 (Recency, Frequency, Monetary)
- 客户生命周期评分
- 行为活跃度评分
- 产品交叉销售指数
- 客户价值评分

## 环境配置

In [12]:
import os
import sys

import spark

# Ensure environment variables are set
if 'JAVA_HOME' not in os.environ:
    java_home = "C:\\Program Files\\Java\\jdk-11"
    os.environ['JAVA_HOME'] = java_home

if 'SPARK_HOME' not in os.environ:
    spark_home = "C:\\Users\\hy120\\spark\\spark-3.5.7-bin-hadoop3"
    os.environ['SPARK_HOME'] = spark_home

# 设置 HADOOP_HOME 指向你的 Hadoop 安装目录
os.environ["HADOOP_HOME"] = "C:\\Users\\hy120\\hadoop"
os.environ["HADOOP_COMMON_HOME"] = "C:\\Users\\hy120\\hadoop"
os.environ["HADOOP_HDFS_HOME"] = "C:\\Users\\hy120\\hadoop"
os.environ["HADOOP_MAPRED_HOME"] = "C:\\Users\\hy120\\hadoop"
os.environ["HADOOP_YARN_HOME"] = "C:\\Users\\hy120\\hadoop"
os.environ["HADOOP_CONF_DIR"] = "C:\\Users\\hy120\\hadoop\\etc\\hadoop"

# 让 Spark executor 和 driver 都用当前这个 Python（你的 .venv 里的 python）
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

print("Environment setup:")
print("  JAVA_HOME:", os.environ.get("JAVA_HOME"))
print("  SPARK_HOME:", os.environ.get("SPARK_HOME"))
print("  PYSPARK_PYTHON:", os.environ.get("PYSPARK_PYTHON"))

Environment setup:
  JAVA_HOME: C:\Program Files\Java\jdk-11
  SPARK_HOME: C:\Users\hy120\spark\spark-3.5.7-bin-hadoop3
  PYSPARK_PYTHON: C:\Users\hy120\Downloads\zhihullm\CASE-customer-group\.venv\Scripts\python.exe


## 导入必要的库

In [13]:
import sys
import logging
from datetime import datetime, timedelta
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    col, when, sum as spark_sum, avg, max as spark_max, min as spark_min,
    row_number, rank, dense_rank, ntile,
    datediff, months_between, year, month,
    round, log, sqrt, abs as spark_abs,
    lit, coalesce, first, last
)

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

## Spark会话配置

In [14]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    col, when, sum as spark_sum, avg, max as spark_max, min as spark_min,
    row_number, rank, dense_rank, ntile,
    datediff, months_between, year, month,
    round, log, sqrt, abs as spark_abs,
    lit, coalesce, first, last, to_date, to_timestamp, trim
)

# 在Jupyter中创建SparkSession

# 如果还没有创建SparkSession，则创建一个
spark = SparkSession.builder \
    .appName("FeatureEngineering") \
    .master("local[2]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.execution.arrow.enabled", "true") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.heartbeatInterval", "300s") \
    .config("spark.rpc.numRetries", "10") \
    .config("spark.rpc.retry.wait", "1s") \
    .config("spark.shuffle.io.retryWait", "10s") \
    .config("spark.shuffle.io.maxRetries", "5") \
    .config("spark.executor.maxFailures", "5") \
    .config("spark.task.maxFailures", "5") \
    .config("spark.hadoop.dfs.permissions.enabled", "false") \
    .config("spark.hadoop.fs.permissions.umask-mode", "000") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print(f"✓ Spark Session 创建成功")
print(f"  Spark版本: {spark.version}")
print(f"  Master: local[2]")

✓ Spark Session 创建成功
  Spark版本: 3.5.7
  Master: local[2]


## 步骤0: 加载数据源

In [15]:
from pathlib import Path

# 获取项目根目录 - 从test/spark目录向上找到项目根
project_root = Path.cwd()
while project_root.name != "CASE-customer-group" and project_root.parent != project_root:
    project_root = project_root.parent

# 数据文件路径 - 在项目根目录的output文件夹中
# 这里假设已经运行过数据清洗脚本
cleaned_base_path = str(project_root / "output" / "cleaned_customer_base.csv")
cleaned_behavior_path = str(project_root / "output" / "cleaned_customer_behavior.csv")

print(f"数据目录: {project_root}")
print(f"清洗后客户基本信息: {cleaned_base_path}")
print(f"清洗后客户行为资产: {cleaned_behavior_path}")
print()

try:
    df_customer_base = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(cleaned_base_path)

    df_customer_behavior = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(cleaned_behavior_path)

    logger.info(f"✓ 数据加载完成")
    logger.info(f"  客户基本信息行数: {df_customer_base.count()}")
    logger.info(f"  客户行为资产行数: {df_customer_behavior.count()}")
    
    print(f"✓ 数据加载完成")
    print(f"  客户基本信息行数: {df_customer_base.count()}")
    print(f"  客户行为资产行数: {df_customer_behavior.count()}")
except Exception as e:
    logger.error(f"加载数据失败: {str(e)}")
    print(f"⚠ 数据加载失败: {str(e)}")
    print(f"  请确保已运行数据清洗脚本 (test_spark_cleansing.ipynb)")
    raise

数据目录: C:\Users\hy120\Downloads\zhihullm\CASE-customer-group
清洗后客户基本信息: C:\Users\hy120\Downloads\zhihullm\CASE-customer-group\output\cleaned_customer_base.csv
清洗后客户行为资产: C:\Users\hy120\Downloads\zhihullm\CASE-customer-group\output\cleaned_customer_behavior.csv



INFO:__main__:✓ 数据加载完成
INFO:__main__:  客户基本信息行数: 10000
INFO:__main__:  客户行为资产行数: 120000


✓ 数据加载完成
  客户基本信息行数: 10000
  客户行为资产行数: 120000


## 步骤1: 配置参数

In [None]:
# 配置参数（可根据需要修改）
args = {
    'JOB_NAME': 'feature_engineering',
    'INPUT_DATABASE': 'customer_cleaned_db',
    'INPUT_TABLE_BASE': 'cleaned_customer_base',
    'INPUT_TABLE_BEHAVIOR': 'cleaned_customer_behavior',
    'OUTPUT_BUCKET': str(project_root / 'output'),
    'OUTPUT_PATH': 'customer_features'
}

# 计算输出路径
output_path = f"{args['OUTPUT_BUCKET']}/{args['OUTPUT_PATH']}"

logger.info(f"开始执行 {args['JOB_NAME']} 任务")
print(f"配置参数:")
print(f"  JOB_NAME: {args['JOB_NAME']}")
print(f"  OUTPUT_PATH: {output_path}")

## 步骤2: 基础特征构建

In [16]:
logger.info("步骤2: 构建基础特征...")

# 2.1 人口统计特征
df_features = df_customer_base.select(
    col("customer_id"),
    col("name"),
    col("age"),
    col("gender"),
    col("occupation_type"),
    col("monthly_income"),
    col("marriage_status"),
    col("city_level"),
    col("lifecycle_stage"),
    col("open_account_date"),
    col("branch_name")
)

# 2.2 计算开户周期（天数）
# 假设当前日期为最新的stat_month对应的月底
reference_date = spark.createDataFrame(
    [{"ref_date": datetime(2025, 6, 30)}]
).select("ref_date")

df_features = df_features.crossJoin(reference_date).select(
    col("customer_id"),
    col("name"),
    col("age"),
    col("gender"),
    col("occupation_type"),
    col("monthly_income"),
    col("marriage_status"),
    col("city_level"),
    col("lifecycle_stage"),
    col("open_account_date"),
    datediff(col("ref_date"), col("open_account_date")).alias("days_as_customer"),
    months_between(col("ref_date"), col("open_account_date")).alias("months_as_customer")
)

# 2.3 添加基础评分特征（基于人口统计）
df_features = df_features \
    .withColumn("income_score",
                when(col("monthly_income") >= 50000, 100)
                .when(col("monthly_income") >= 30000, 75)
                .when(col("monthly_income") >= 15000, 50)
                .otherwise(25)) \
    .withColumn("age_group",
                when(col("age") < 30, "18-30")
                .when(col("age") < 40, "30-40")
                .when(col("age") < 50, "40-50")
                .when(col("age") < 60, "50-60")
                .otherwise("60+")) \
    .withColumn("lifecycle_score",
                when(col("lifecycle_stage") == "价值客户", 100)
                .when(col("lifecycle_stage") == "忠诚客户", 85)
                .when(col("lifecycle_stage") == "成熟客户", 70)
                .when(col("lifecycle_stage") == "成长客户", 55)
                .when(col("lifecycle_stage") == "新客户", 30)
                .otherwise(0))

logger.info("✓ 基础特征构建完成")
print(f"✓ 基础特征列数: {len(df_features.columns)}")

INFO:__main__:步骤2: 构建基础特征...
INFO:__main__:✓ 基础特征构建完成


✓ 基础特征列数: 15


## 步骤3: RFM 分析特征

In [17]:
logger.info("步骤3: 生成RFM特征...")

# 准备行为数据，按客户汇总最新数据
df_behavior_latest = df_customer_behavior \
    .withColumn("rn", row_number().over(
        Window.partitionBy("customer_id").orderBy(col("stat_month").desc())
    )) \
    .filter(col("rn") == 1) \
    .select(
        col("customer_id"),
        col("stat_month").alias("last_contact_month"),
        col("last_app_login_time"),
        col("last_contact_time"),
        col("total_assets"),
        col("credit_card_monthly_expense"),
        col("investment_monthly_count"),
        col("app_login_count"),
        col("contact_result"),
        col("deposit_balance"),
        col("financial_balance"),
        col("fund_balance"),
        col("insurance_balance"),
        col("deposit_flag"),
        col("financial_flag"),
        col("fund_flag"),
        col("insurance_flag")
    )

# 转换last_contact_time为日期，计算Recency
df_behavior_latest = df_behavior_latest \
    .withColumn("last_contact_date",
                when(col("last_contact_time").isNotNull(),
                     col("last_contact_time").cast("date"))
                .otherwise(None)) \
    .withColumn("recency_days",
                when(col("last_contact_date").isNotNull(),
                     datediff(lit(datetime(2025, 6, 30)), col("last_contact_date")))
                .otherwise(999))  # 从未联系的设为999天

# 频率 (Frequency): 使用app_login_count作为代理
df_behavior_latest = df_behavior_latest \
    .withColumn("frequency_score",
                when(col("app_login_count") >= 10, 100)
                .when(col("app_login_count") >= 5, 75)
                .when(col("app_login_count") >= 2, 50)
                .otherwise(25))

# 金额 (Monetary): 基于total_assets
# 计算所有客户资产的分位数用于评分
asset_percentiles = df_behavior_latest.selectExpr(
    "percentile_approx(total_assets, 0.25) as p25",
    "percentile_approx(total_assets, 0.50) as p50",
    "percentile_approx(total_assets, 0.75) as p75"
).collect()[0]

df_behavior_latest = df_behavior_latest \
    .withColumn("monetary_score",
                when(col("total_assets") >= asset_percentiles['p75'], 100)
                .when(col("total_assets") >= asset_percentiles['p50'], 75)
                .when(col("total_assets") >= asset_percentiles['p25'], 50)
                .otherwise(25))

# 综合RFM评分
df_behavior_latest = df_behavior_latest \
    .withColumn("rfm_score",
                round((col("frequency_score") * 0.4 +
                       col("monetary_score") * 0.4 +
                       (100 - col("recency_days")/999*100) * 0.2), 2))

logger.info("✓ RFM特征生成完成")
print(f"✓ RFM特征生成完成")

INFO:__main__:步骤3: 生成RFM特征...
INFO:__main__:✓ RFM特征生成完成


✓ RFM特征生成完成


## 步骤4: 行为活跃度和资产特征

In [18]:
logger.info("步骤4: 生成行为活跃度和资产特征...")

# 4.1 行为活跃度特征
df_behavior_latest = df_behavior_latest \
    .withColumn("engagement_score",
                round((col("app_login_count") * 10 +
                       col("investment_monthly_count") * 20) / 30, 2)) \
    .withColumn("is_active_app",
                when(col("app_login_count") >= 3, 1).otherwise(0)) \
    .withColumn("is_active_investor",
                when(col("investment_monthly_count") >= 1, 1).otherwise(0)) \
    .withColumn("is_active_consumer",
                when(col("credit_card_monthly_expense") > 0, 1).otherwise(0)) \
    .withColumn("activity_type",
                when((col("is_active_app") == 1) & (col("is_active_investor") == 1), "多元活跃")
                .when(col("is_active_investor") == 1, "投资活跃")
                .when(col("is_active_app") == 1, "应用活跃")
                .when(col("is_active_consumer") == 1, "消费活跃")
                .otherwise("低活跃"))

# 4.2 资产特征
df_behavior_latest = df_behavior_latest \
    .withColumn("asset_concentration",
                round(((col("deposit_balance") ** 2 +
                        col("financial_balance") ** 2 +
                        col("fund_balance") ** 2 +
                        col("insurance_balance") ** 2) /
                       (col("total_assets") ** 2)), 4)) \
    .withColumn("investment_product_diversity",
                col("deposit_flag") + col("financial_flag") +
                col("fund_flag") + col("insurance_flag")) \
    .withColumn("diversification_score",
                when(col("investment_product_diversity") == 4, 100)
                .when(col("investment_product_diversity") == 3, 75)
                .when(col("investment_product_diversity") == 2, 50)
                .when(col("investment_product_diversity") == 1, 25)
                .otherwise(0))

logger.info("✓ 行为活跃度和资产特征生成完成")
print(f"✓ 行为活跃度和资产特征生成完成")

INFO:__main__:步骤4: 生成行为活跃度和资产特征...
INFO:__main__:✓ 行为活跃度和资产特征生成完成


✓ 行为活跃度和资产特征生成完成


## 步骤5: 客户价值评分和分层

In [19]:
logger.info("步骤5: 计算客户价值评分和分层...")

# 计算客户价值评分和分层
df_behavior_latest = df_behavior_latest \
    .withColumn("customer_value_score",
                round((col("rfm_score") * 0.4 +
                       col("engagement_score") * 0.3 +
                       col("diversification_score") * 0.3), 2)) \
    .withColumn("customer_tier",
                when(col("customer_value_score") >= 80, "VIP高价值")
                .when(col("customer_value_score") >= 60, "核心客户")
                .when(col("customer_value_score") >= 40, "重点培育")
                .otherwise("低价值"))

logger.info("✓ 客户价值评分和分层完成")
print(f"✓ 客户价值评分和分层完成")

INFO:__main__:步骤5: 计算客户价值评分和分层...
INFO:__main__:✓ 客户价值评分和分层完成


✓ 客户价值评分和分层完成


## 步骤6: 交叉销售和风险评分

In [20]:
logger.info("步骤6: 生成交叉销售指数和风险评分...")

# 6.1 产品交叉销售机会评分
df_behavior_latest = df_behavior_latest \
    .withColumn("financial_upgrade_score",
                when((col("deposit_flag") == 1) & (col("financial_flag") == 0),
                     round(col("engagement_score") * 1.2, 2))
                .otherwise(0)) \
    .withColumn("fund_upgrade_score",
                when((col("financial_flag") == 1) & (col("fund_flag") == 0),
                     round(col("engagement_score") * 0.9, 2))
                .otherwise(0)) \
    .withColumn("insurance_upgrade_score",
                when(col("insurance_flag") == 0,
                     round(col("rfm_score") * col("engagement_score") / 100, 2))
                .otherwise(0)) \
    .withColumn("credit_card_upgrade_score",
                when(col("credit_card_monthly_expense") > 0,
                     50 + round(col("engagement_score") * 0.5, 2))
                .otherwise(30))

# 6.2 风险评分
df_behavior_latest = df_behavior_latest \
    .withColumn("churn_risk_score",
                when(col("recency_days") > 180, 80)  # 6个月未联系
                .when(col("recency_days") > 90, 60)   # 3个月未联系
                .when(col("recency_days") > 30, 40)   # 1个月未联系
                .when(col("contact_result") == "拒绝", 50)
                .otherwise(20)) \
    .withColumn("is_at_risk",
                when(col("churn_risk_score") >= 60, 1).otherwise(0))

logger.info("✓ 交叉销售和风险评分完成")
print(f"✓ 交叉销售和风险评分完成")

INFO:__main__:步骤6: 生成交叉销售指数和风险评分...
INFO:__main__:✓ 交叉销售和风险评分完成


✓ 交叉销售和风险评分完成


## 步骤7: 合并所有特征

In [21]:
logger.info("步骤7: 合并所有特征...")

df_final_features = df_features \
    .join(df_behavior_latest, on="customer_id", how="left") \
    .select(
        # 基础信息
        col("customer_id"),
        col("name"),
        col("age"),
        col("age_group"),
        col("gender"),
        col("occupation_type"),
        col("monthly_income"),
        col("marriage_status"),
        col("city_level"),
        col("lifecycle_stage"),

        # 客户周期特征
        col("days_as_customer"),
        col("months_as_customer"),

        # 资产特征
        col("total_assets"),
        col("deposit_balance"),
        col("financial_balance"),
        col("fund_balance"),
        col("insurance_balance"),
        col("asset_concentration"),
        col("investment_product_diversity"),

        # 行为特征
        col("app_login_count"),
        col("credit_card_monthly_expense"),
        col("investment_monthly_count"),
        col("activity_type"),
        col("is_active_app"),
        col("is_active_investor"),
        col("is_active_consumer"),

        # RFM评分
        col("recency_days"),
        col("frequency_score"),
        col("monetary_score"),
        col("rfm_score"),

        # 活跃度评分
        col("engagement_score"),

        # 多维度评分
        col("income_score"),
        col("lifecycle_score"),
        col("diversification_score"),

        # 客户价值评分
        col("customer_value_score"),
        col("customer_tier"),

        # 交叉销售机会
        col("financial_upgrade_score"),
        col("fund_upgrade_score"),
        col("insurance_upgrade_score"),
        col("credit_card_upgrade_score"),

        # 风险评分
        col("churn_risk_score"),
        col("is_at_risk"),

        # 最后更新时间
        col("last_contact_date"),
        col("last_app_login_time")
    )

logger.info("✓ 所有特征合并完成")
print(f"✓ 所有特征合并完成")
print(f"  最终特征列数: {len(df_final_features.columns)}")

INFO:__main__:步骤7: 合并所有特征...
INFO:__main__:✓ 所有特征合并完成


✓ 所有特征合并完成
  最终特征列数: 44


## 步骤8: 特征统计和验证

In [22]:
logger.info("步骤8: 特征统计...")

print("\n" + "="*80)
print("特征统计和验证")
print("="*80)

print(f"\n最终特征表行数: {df_final_features.count()}")
print(f"特征列数: {len(df_final_features.columns)}")

# 统计各客户分层
logger.info("客户分层分布:")
print("\n客户分层分布:")
try:
    tier_stats = df_final_features.groupBy("customer_tier").count().orderBy("count", ascending=False).collect()
    for row in tier_stats:
        print(f"  {row['customer_tier']}: {row['count']} 人")
        logger.info(f"  {row['customer_tier']}: {row['count']} 人")
except Exception as e:
    logger.warning(f"客户分层统计失败（跳过）: {str(e)}")
    print(f"  (统计失败)")

# 统计活动类型分布
logger.info("活跃类型分布:")
print("\n活跃类型分布:")
try:
    activity_stats = df_final_features.groupBy("activity_type").count().orderBy("count", ascending=False).collect()
    for row in activity_stats:
        print(f"  {row['activity_type']}: {row['count']} 人")
        logger.info(f"  {row['activity_type']}: {row['count']} 人")
except Exception as e:
    logger.warning(f"活动类型统计失败（跳过）: {str(e)}")
    print(f"  (统计失败)")

# 统计风险客户
print("\n风险客户统计:")
try:
    at_risk_count = df_final_features.filter(col("is_at_risk") == 1).count()
    print(f"  处于风险的客户: {at_risk_count} 人")
    logger.info(f"  处于风险的客户: {at_risk_count} 人")
except Exception as e:
    logger.warning(f"风险统计失败（跳过）: {str(e)}")

print("\n" + "="*80)

INFO:__main__:步骤8: 特征统计...



特征统计和验证


INFO:__main__:客户分层分布:



最终特征表行数: 10000
特征列数: 44

客户分层分布:


INFO:__main__:  低价值: 5429 人
INFO:__main__:  重点培育: 4277 人
INFO:__main__:  核心客户: 294 人
INFO:__main__:活跃类型分布:


  低价值: 5429 人
  重点培育: 4277 人
  核心客户: 294 人

活跃类型分布:


INFO:__main__:  应用活跃: 5087 人
INFO:__main__:  多元活跃: 3222 人
INFO:__main__:  消费活跃: 865 人
INFO:__main__:  投资活跃: 486 人
INFO:__main__:  低活跃: 340 人


  应用活跃: 5087 人
  多元活跃: 3222 人
  消费活跃: 865 人
  投资活跃: 486 人
  低活跃: 340 人

风险客户统计:


INFO:__main__:  处于风险的客户: 1906 人


  处于风险的客户: 1906 人



## 步骤9: 输出特征表

In [None]:
logger.info("步骤9: 输出特征表...")

# 创建output目录
output_dir = Path(output_path)
output_dir.mkdir(parents=True, exist_ok=True)

try:
    # 输出特征表为CSV
    df_final_features.coalesce(1) \
        .write.mode("overwrite") \
        .option("header", "true") \
        .csv(output_path)
    
    logger.info(f"特征表已输出到: {output_path}")
    print(f"\n✓ 特征表已输出")
    print(f"  路径: {output_path}")
    print(f"  行数: {df_final_features.count()}")
    print(f"  列数: {len(df_final_features.columns)}")
except Exception as e:
    logger.error(f"输出特征表失败: {str(e)}")
    print(f"⚠ 输出特征表失败: {str(e)}")
    raise

## 步骤10: 预览结果

In [None]:
print("\n【特征工程结果预览 - 核心特征前5行】\n")

# 显示核心特征
df_final_features.select(
    "customer_id",
    "name",
    "age",
    "monthly_income",
    "customer_tier",
    "customer_value_score",
    "rfm_score",
    "engagement_score",
    "activity_type",
    "churn_risk_score"
).show(5, truncate=False)

print("\n【客户分层分布】\n")
df_final_features.groupBy("customer_tier").count().orderBy("count", ascending=False).show(truncate=False)

print("\n【活跃类型分布】\n")
df_final_features.groupBy("activity_type").count().orderBy("count", ascending=False).show(truncate=False)

## 完成任务

In [None]:
logger.info(f"{args['JOB_NAME']} 任务完成！")
print("\n" + "="*80)
print(f"✓ {args['JOB_NAME']} 任务完成！")
print("="*80)
print("\n汇总:")
print(f"  输入客户数: {df_customer_base.count()}")
print(f"  输出特征数: {df_final_features.count()}")
print(f"  生成特征列数: {len(df_final_features.columns)}")
print(f"  输出路径: {output_path}")
print("\n")

## 清理资源

In [None]:
# 停止Spark Session
spark.stop()
logger.info("Spark Session 已停止")
print("✓ Spark Session 已停止")

## 完成任务

In [None]:
logger.info(f"{args['JOB_NAME']} 任务完成！")