In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, max, min, when, month, year, expr, lit
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline

# 初始化 SparkSession
print("--- 1. 初始化 SparkSession (使用实验报告配置) ---")
spark = SparkSession.builder \
    .appName("ComplexSalesAnalysis") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

--- 1. 初始化 SparkSession (使用实验报告配置) ---


In [3]:
# --- 2. 加载数据集和基础检查 ---
print("\n--- 2. 加载数据集 ---")
sales_df = spark.read.csv("sales_data.csv", header=True, inferSchema=True)
print(f"数据集行数: {sales_df.count()}") # 预期输出: 113036

# --- 3. 运行简单 Spark 验证程序 ---
print("\n--- 3. 运行简单 Spark 验证程序 ---")
# 筛选 'Accessories' 类别，并按日期聚合总收入
simple_result = sales_df.filter(sales_df["Product_Category"] == "Accessories") \
             .groupBy("Date") \
             .sum("Revenue") 
print("筛选 Accessories 并按日期聚合总收入:")
simple_result.show(5)


--- 2. 加载数据集 ---
数据集行数: 113036

--- 3. 运行简单 Spark 验证程序 ---
筛选 Accessories 并按日期聚合总收入:
+----------+------------+
|      Date|sum(Revenue)|
+----------+------------+
| 2016/5/12|       21505|
| 2015/7/29|        5994|
|  2013/8/2|       17700|
|2013/11/10|       20464|
|2015/11/24|       28005|
+----------+------------+
only showing top 5 rows


In [4]:
from pyspark.sql.functions import sum, count, lit, col, expr, avg, when
from pyspark.sql.window import Window
# 确保 enriched_df 和 sales_df 在此单元格运行前已经被定义

# --- 4. Spark SQL 查询 (查询产品类别统计) ---
print("\n--- 4. Spark SQL 查询 (查询产品类别统计) ---")
# 注册临时表用于 SQL 查询
sales_df.createOrReplaceTempView("sales")

# 查询每个产品类别的总收入和平均利润
print("每个产品类别的总收入和平均利润:")
category_stats = spark.sql("""
SELECT
    Product_Category,
    COUNT(*) as Order_Count,
    SUM(Revenue) as Total_Revenue,
    AVG(Profit) as Avg_Profit,
    SUM(Profit) as Total_Profit,
    ROUND((SUM(Profit) / SUM(Revenue)) * 100, 2) as Profit_Margin_Percent
FROM sales
GROUP BY Product_Category
ORDER BY Total_Revenue DESC
""")
category_stats.show()


# --- 5. DataFrame API 复杂转换 ---
print("\n--- 5. DataFrame API 复杂转换 ---")

# (1)添加派生列：利润率、单位利润、客户分段、收入类别
enriched_df = sales_df.withColumn(
    "Profit_Margin",
    expr("ROUND((Profit / Revenue) * 100, 2)") 
).withColumn(
    "Unit_Profit",
    expr("Unit_Price - Unit_Cost") 
).withColumn(
    "Customer_Segment",
    # 使用报告中的年龄分段逻辑
    when(col("Customer_Age") < 25, "Youth")
    .when((col("Customer_Age") >= 25) & (col("Customer_Age") < 48), "Young_Adult")
    .when((col("Customer_Age") >= 40) & (col("Customer_Age") < 60), "Middle_Aged")
    .otherwise("Senior")
).withColumn(
    "Revenue_Category", # 用于后续 ML 任务的目标变量
    when(col("Revenue") < 500, "Low")
    .when((col("Revenue") >= 500) & (col("Revenue") < 1000), "Medium")
    .otherwise("High")
)
print("添加派生列后的前5行数据:")
enriched_df.select("Product", "Customer_Age", "Customer_Segment", "Revenue", "Revenue_Category", "Profit_Margin").show(5)

# (2)计算每个产品的移动平均收入
# 注意：报告中 rowsBetween(-2, 8) 的窗口不常用，这里使用更合理的 rowsBetween(-2, 0) (前2行和当前行)
window_spec = Window.partitionBy("Product").orderBy("Date").rowsBetween(-2, 0) 
moving_avg_df = enriched_df.withColumn(
    "Moving_Avg_Revenue",
    avg(col("Revenue")).over(window_spec)
)
print("计算移动平均收入后的前5行数据:")
moving_avg_df.select("Date", "Product", "Revenue", "Moving_Avg_Revenue").show(5)

# (3)数据透视:按产品类别和客户性别统计收入
print("按产品类别和客户性别统计收入:")
pivot_df = enriched_df.groupBy("Product_Category").pivot("Customer_Gender").agg(
    sum("Revenue").alias("Total_Revenue"),
    count(lit(1)).alias("Order_Count") # 已修正：使用 count(lit(1)) 避免 AnalysisException
)
pivot_df.show()


--- 4. Spark SQL 查询 (查询产品类别统计) ---
每个产品类别的总收入和平均利润:
+----------------+-----------+-------------+------------------+------------+---------------------+
|Product_Category|Order_Count|Total_Revenue|        Avg_Profit|Total_Profit|Profit_Margin_Percent|
+----------------+-----------+-------------+------------------+------------+---------------------+
|           Bikes|      25982|     61782134| 789.7496728504349|    20519276|                33.21|
|     Accessories|      70120|     15117992|126.38871933827724|     8862377|                58.62|
|        Clothing|      16934|      8370882|167.67727648517774|     2839447|                33.92|
+----------------+-----------+-------------+------------------+------------+---------------------+


--- 5. DataFrame API 复杂转换 ---
添加派生列后的前5行数据:
+-------------------+------------+----------------+-------+----------------+-------------+
|            Product|Customer_Age|Customer_Segment|Revenue|Revenue_Category|Profit_Margin|
+-------------------+-----

In [6]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
import pandas as pd

# --- 6. MLlib 任务：随机森林预测收入类别 ---
print("\n--- 6. MLlib 任务：随机森林预测收入类别 ---")

# 特征选择：移除 Revenue 的构成要素以避免数据泄露
feature_columns = [
    "Year", "Month", "Customer_Age", "Age_Group", "Customer_Gender", "Country", 
    "State", "Product_Category", "Sub_Category", "Product", "Profit", "Cost"
]
target_column = "Revenue_Category"

ml_df = enriched_df.select(*feature_columns, target_column)

# 1. 类别特征编码 (StringIndexer)
categorical_cols = [f.name for f in ml_df.schema.fields if f.dataType == 'string' and f.name != target_column]
indexers = [StringIndexer(inputCol=c, outputCol=c + "_index", handleInvalid="skip") for c in categorical_cols]
target_indexer = StringIndexer(inputCol=target_column, outputCol="label_index", handleInvalid="skip")
indexers.append(target_indexer)

pipeline_indexer = Pipeline(stages=indexers)
ml_data = pipeline_indexer.fit(ml_df).transform(ml_df)
ml_data = ml_data.withColumn("label_index", col("label_index").cast(DoubleType()))

# 2. 特征向量化 (VectorAssembler)
indexed_cols = [c + "_index" for c in categorical_cols]
numerical_cols = ["Customer_Age", "Profit", "Cost"]
all_features = indexed_cols + numerical_cols

assembler = VectorAssembler(inputCols=all_features, outputCol="features_raw")
ml_data = assembler.transform(ml_data)

# 3. 特征标准化 (StandardScaler)
scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)
scaler_model = scaler.fit(ml_data)
ml_data = scaler_model.transform(ml_data)

# 4. 拆分训练集和测试集
(train_data, test_data) = ml_data.randomSplit([0.7, 0.3], seed=42)

# 5. 训练随机森林分类器
print("开始训练随机森林分类模型...")
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label_index",
    numTrees=100,
    maxDepth=10,
    seed=42
)
rf_model = rf.fit(train_data)

# 6. 模型评估
predictions = rf_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(
    labelCol="label_index", 
    predictionCol="prediction", 
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

print(f"\n--- 模型性能评估 ---")
print(f"准确率 (Accuracy): {accuracy:.4f} (报告预期: 0.7952)")
print(f"F1 分数: {f1_score:.4f} (报告预期: 0.7337)")

# 打印特征重要性
feature_importances = pd.Series(rf_model.featureImportances.toArray(), index=all_features).sort_values(ascending=False)
print("\n特征重要性（前10名）:")
print(feature_importances.head(10))

# 停止 SparkSession
spark.stop()


--- 6. MLlib 任务：随机森林预测收入类别 ---
开始训练随机森林分类模型...

--- 模型性能评估 ---
准确率 (Accuracy): 0.9902 (报告预期: 0.7952)
F1 分数: 0.9903 (报告预期: 0.7337)

特征重要性（前10名）:
Cost            0.680953
Profit          0.317245
Customer_Age    0.001802
dtype: float64
