In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, isnull, countDistinct, min, max, mean, stddev, percentile_approx

# 创建Spark会话
spark = SparkSession.builder \
    .appName("Clickstream Data EDA") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# 加载bronze_clickstream数据
bronze_clickstream_df = spark.read.parquet("datamart/bronze/bronze_clickstream")

# 1. 基本信息
print("=== Clickstream数据基本信息 ===")
print(f"总行数: {bronze_clickstream_df.count()}")
print(f"总列数: {len(bronze_clickstream_df.columns)}")

# 2. 查看数据结构
print("\n=== 数据模式 ===")
bronze_clickstream_df.printSchema()

# 3. 查看样本数据
print("\n=== 样本数据 ===")
bronze_clickstream_df.show(5, truncate=False)

# 4. 检查每列的非空值和空值数量
print("\n=== 列的非空值和空值数量 ===")
null_counts = bronze_clickstream_df.select([
    count(when(col(c).isNull() | isnull(c), c)).alias(c) 
    for c in bronze_clickstream_df.columns
])
null_counts.show(truncate=False)

# 5. 特征列统计分析
print("\n=== 特征列统计分析 ===")
feature_cols = [col_name for col_name in bronze_clickstream_df.columns if col_name.startswith("fe_")]

# 对每个特征列计算统计指标
for feature_col in feature_cols:
    stats = bronze_clickstream_df.select(
        min(feature_col).alias("min"),
        max(feature_col).alias("max"),
        mean(feature_col).alias("mean"),
        stddev(feature_col).alias("stddev"),
        percentile_approx(feature_col, 0.5).alias("median")
    ).collect()[0]
    
    print(f"{feature_col}: min={stats['min']}, max={stats['max']}, mean={stats['mean']:.2f}, stddev={stats['stddev']:.2f}, median={stats['median']}")

# 6. 检查特征列的分布
print("\n=== 特征值分布 ===")
# 样本特征列
sample_feature_cols = feature_cols[:3]  # 只取前3个特征列示例

for feature_col in sample_feature_cols:
    print(f"\n{feature_col}分布:")
    bronze_clickstream_df.select(feature_col) \
        .groupBy(feature_col) \
        .count() \
        .orderBy(col("count").desc()) \
        .show(10)

# 7. 检查Customer_ID和snapshot_date
print("\n=== Customer_ID分析 ===")
customer_count = bronze_clickstream_df.select("Customer_ID").distinct().count()
print(f"唯一客户数: {customer_count}")

# 计算每个客户的记录数
print("\n每个客户的记录数分布:")
bronze_clickstream_df.groupBy("Customer_ID") \
    .count() \
    .orderBy(col("count").desc()) \
    .select(
        percentile_approx("count", 0.5).alias("median_records_per_customer"),
        min("count").alias("min_records"),
        max("count").alias("max_records"),
        mean("count").alias("avg_records")
    ).show()

# 8. 检查snapshot_date
print("\n=== snapshot_date分析 ===")
date_null_count = bronze_clickstream_df.filter(col("snapshot_date").isNull()).count()
print(f"snapshot_date为NULL的记录数: {date_null_count} ({date_null_count/bronze_clickstream_df.count()*100:.2f}%)")

if date_null_count < bronze_clickstream_df.count():
    print("\nsnapshot_date分布:")
    bronze_clickstream_df.groupBy("snapshot_date") \
        .count() \
        .orderBy("snapshot_date") \
        .show(20)

# 9. 特征相关性分析
print("\n=== 特征相关性分析 ===")
print("选择几个样本特征进行相关性分析...")

# 为了简化输出，只选择前5个特征进行相关性分析
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# 创建特征向量
sample_features = feature_cols[:5]  # 只使用前5个特征
assembler = VectorAssembler(inputCols=sample_features, outputCol="features")
vector_df = assembler.transform(bronze_clickstream_df)

# 计算相关性矩阵
corr_matrix = Correlation.corr(vector_df, "features").collect()[0][0]
corr_matrix_array = corr_matrix.toArray()

# 打印相关性矩阵
print("相关性矩阵:")
print(f"特征: {sample_features}")
for i, row in enumerate(corr_matrix_array):
    print(f"{sample_features[i]}: {[round(x, 2) for x in row]}")

=== Clickstream数据基本信息 ===
总行数: 215376
总列数: 24

=== 数据模式 ===
root
 |-- fe_1: integer (nullable = true)
 |-- fe_2: integer (nullable = true)
 |-- fe_3: integer (nullable = true)
 |-- fe_4: integer (nullable = true)
 |-- fe_5: integer (nullable = true)
 |-- fe_6: integer (nullable = true)
 |-- fe_7: integer (nullable = true)
 |-- fe_8: integer (nullable = true)
 |-- fe_9: integer (nullable = true)
 |-- fe_10: integer (nullable = true)
 |-- fe_11: integer (nullable = true)
 |-- fe_12: integer (nullable = true)
 |-- fe_13: integer (nullable = true)
 |-- fe_14: integer (nullable = true)
 |-- fe_15: integer (nullable = true)
 |-- fe_16: integer (nullable = true)
 |-- fe_17: integer (nullable = true)
 |-- fe_18: integer (nullable = true)
 |-- fe_19: integer (nullable = true)
 |-- fe_20: integer (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- snapshot_date: date (nullable = true)
 |-- bronze_ingest_timestamp: timestamp (nullable = true)
 |-- bronze_source_file: string (nullabl

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, mean, stddev, min, max, count, sum, expr, 
    when, lit, current_timestamp, datediff, month, year, 
    quarter, row_number, abs as spark_abs  # 使用PySpark的abs函数
)
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.ml.feature import StandardScaler, MinMaxScaler
from pyspark.ml.feature import VectorAssembler

# 创建Spark会话
spark = SparkSession.builder \
    .appName("Silver Layer - Clickstream Data") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# 加载bronze_clickstream数据
bronze_clickstream_df = spark.read.parquet("datamart/bronze/bronze_clickstream")

print("开始处理Clickstream数据...")

# 1. 定义特征列和其他列
feature_cols = [f"fe_{i}" for i in range(1, 21)]
id_cols = ["Customer_ID", "snapshot_date"]
metadata_cols = ["bronze_ingest_timestamp", "bronze_source_file"]

# 2. 创建detailed视图 (包含原始粒度的标准化特征和异常值标志)
print("\n处理详细视图...")

# 2.1 为每个特征添加标准化特征和异常值标志
detailed_df = bronze_clickstream_df

# 计算每个特征的均值和标准差
feature_stats = {}
for col_name in feature_cols:
    stats = bronze_clickstream_df.select(
        mean(col(col_name)).alias("mean"),
        stddev(col(col_name)).alias("stddev")
    ).collect()[0]
    feature_stats[col_name] = {
        "mean": stats["mean"], 
        "stddev": stats["stddev"]
    }

# 添加标准化特征和异常值标志
for col_name in feature_cols:
    mean_val = feature_stats[col_name]["mean"]
    stddev_val = feature_stats[col_name]["stddev"]
    
    # 添加Z-score标准化特征
    detailed_df = detailed_df.withColumn(
        f"{col_name}_normalized", 
        (col(col_name) - lit(mean_val)) / lit(stddev_val)
    )
    
    # 添加异常值标志 (超过±3个标准差) - 使用PySpark的abs函数
    detailed_df = detailed_df.withColumn(
        f"{col_name}_is_outlier",
        (spark_abs(col(f"{col_name}_normalized")) > 3)
    )

# 2.2 添加时间相关特征
# 按客户分组，按日期排序
window_spec = Window.partitionBy("Customer_ID").orderBy("snapshot_date")

# 添加序号、月份、季度和年份
detailed_df = detailed_df.withColumn("row_num_by_customer", row_number().over(window_spec)) \
    .withColumn("month", month("snapshot_date")) \
    .withColumn("quarter", quarter("snapshot_date")) \
    .withColumn("year", year("snapshot_date"))

# 获取每个客户的第一个快照日期
first_snapshot = detailed_df.groupBy("Customer_ID") \
    .agg(min("snapshot_date").alias("first_snapshot_date"))

# 将第一个快照日期加入到详细数据中
detailed_df = detailed_df.join(first_snapshot, "Customer_ID", "left")

# 计算相对于第一个快照的月份差
detailed_df = detailed_df.withColumn(
    "months_since_first",
    (year("snapshot_date") - year("first_snapshot_date")) * 12 + 
    (month("snapshot_date") - month("first_snapshot_date"))
)

# 2.3 添加处理元数据
detailed_df = detailed_df.withColumn(
    "silver_process_timestamp", 
    current_timestamp()
)

# 2.4 创建最终silver_clickstream_detailed表
silver_clickstream_detailed = detailed_df.select(
    # ID列
    *id_cols,
    # 原始特征
    *feature_cols,
    # 标准化特征
    *[f"{col}_normalized" for col in feature_cols],
    # 异常值标志
    *[f"{col}_is_outlier" for col in feature_cols],
    # 时间特征
    "month", "quarter", "year", 
    "row_num_by_customer", "months_since_first",
    # 元数据
    *metadata_cols,
    "silver_process_timestamp"
)

# 3. 创建聚合视图 (每个客户的聚合特征)
print("\n处理聚合视图...")

# 按客户ID分组计算聚合统计
agg_exprs = []
for col_name in feature_cols:
    # 均值
    agg_exprs.append(mean(col(col_name)).alias(f"{col_name}_mean"))
    # 最大值
    agg_exprs.append(max(col(col_name)).alias(f"{col_name}_max"))
    # 最小值
    agg_exprs.append(min(col(col_name)).alias(f"{col_name}_min"))
    # 标准差
    agg_exprs.append(stddev(col(col_name)).alias(f"{col_name}_stddev"))
    # 异常值计数
    agg_exprs.append(
        sum(when(col(f"{col_name}_is_outlier"), 1).otherwise(0)).alias(f"{col_name}_outlier_count")
    )

# 额外的聚合指标
agg_exprs.extend([
    max("snapshot_date").alias("latest_snapshot_date"),
    count("*").alias("record_count"),
    max("months_since_first").alias("max_months"),
    min("first_snapshot_date").alias("first_snapshot_date")
])

# 执行聚合
silver_clickstream_aggregated = detailed_df.groupBy("Customer_ID").agg(*agg_exprs)

# 添加处理元数据
silver_clickstream_aggregated = silver_clickstream_aggregated.withColumn(
    "silver_process_timestamp", 
    current_timestamp()
)

# 4. 保存结果
print("\n保存Silver层Clickstream数据...")

# 保存详细视图
silver_clickstream_detailed.write.mode("overwrite").parquet("datamart/silver/silver_clickstream_detailed")
print(f"详细视图已保存，记录数: {silver_clickstream_detailed.count()}")

# 保存聚合视图
silver_clickstream_aggregated.write.mode("overwrite").parquet("datamart/silver/silver_clickstream_aggregated")
print(f"聚合视图已保存，记录数: {silver_clickstream_aggregated.count()}")

# 5. 验证结果
print("\n验证处理结果...")

# 验证详细视图
print("\n详细视图Schema:")
silver_clickstream_detailed.printSchema()
print("\n详细视图样本:")
silver_clickstream_detailed.select(
    "Customer_ID", "snapshot_date", 
    "fe_1", "fe_1_normalized", "fe_1_is_outlier",
    "months_since_first"
).show(5)

# 验证聚合视图
print("\n聚合视图Schema:")
silver_clickstream_aggregated.printSchema()
print("\n聚合视图样本:")
silver_clickstream_aggregated.select(
    "Customer_ID", "fe_1_mean", "fe_1_max", "fe_1_min", "fe_1_stddev", "fe_1_outlier_count",
    "record_count", "latest_snapshot_date"
).show(5)

print("\nClickstream处理完成!")

开始处理Clickstream数据...

处理详细视图...

处理聚合视图...

保存Silver层Clickstream数据...
详细视图已保存，记录数: 215376
聚合视图已保存，记录数: 8974

验证处理结果...

详细视图Schema:
root
 |-- Customer_ID: string (nullable = true)
 |-- snapshot_date: date (nullable = true)
 |-- fe_1: integer (nullable = true)
 |-- fe_2: integer (nullable = true)
 |-- fe_3: integer (nullable = true)
 |-- fe_4: integer (nullable = true)
 |-- fe_5: integer (nullable = true)
 |-- fe_6: integer (nullable = true)
 |-- fe_7: integer (nullable = true)
 |-- fe_8: integer (nullable = true)
 |-- fe_9: integer (nullable = true)
 |-- fe_10: integer (nullable = true)
 |-- fe_11: integer (nullable = true)
 |-- fe_12: integer (nullable = true)
 |-- fe_13: integer (nullable = true)
 |-- fe_14: integer (nullable = true)
 |-- fe_15: integer (nullable = true)
 |-- fe_16: integer (nullable = true)
 |-- fe_17: integer (nullable = true)
 |-- fe_18: integer (nullable = true)
 |-- fe_19: integer (nullable = true)
 |-- fe_20: integer (nullable = true)
 |-- fe_1_normalized: doub