In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import os

# 配置 SparkConf 使用 LOCAL 模式
conf = SparkConf() \
    .setAppName("CHN_LOCAL_RandomForest") \
    .setMaster("local[*]") \
    .set("spark.executor.memory", "8g") \
    .set("spark.driver.memory", "8g") \
    .set("spark.executor.cores", "4") \
    .set("spark.driver.cores", "4")

# 初始化 SparkSession
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark 配置成功")

TRAIN_PATH = "/chn/train.csv"
TEST_PATH = "/chn/test.csv"
MODEL_SAVE_DIR = "hdfs:///chn/model"

# 加载并查看训练数据的前几行
print("-----加载训练数据-----")
train_rdd = spark.sparkContext.textFile(TRAIN_PATH)
train_data = train_rdd.map(lambda line: Row(
    label=float(line.split(",")[-1]),
    features=Vectors.dense([float(x) for x in line.split(",")[:-1]])
)).toDF()

# 随机采样训练数据
sampled_train_data = train_data.sample(withReplacement=False, fraction=0.2, seed=42)

# 将特征转换为单个向量列
vector_assembler = VectorAssembler(inputCols=["features"], outputCol="features_vec")
train_data = vector_assembler.transform(sampled_train_data).select("features_vec", "label")
print("-----训练数据加载成功-----")

# 加载并查看测试数据的前几行
print("-----加载测试数据-----")
test_rdd = spark.sparkContext.textFile(TEST_PATH)
test_data = test_rdd.map(lambda line: Row(
    label=float(line.split(",")[-1]),
    features=Vectors.dense([float(x) for x in line.split(",")[:-1]])
)).toDF()

test_data = vector_assembler.transform(test_data).select("features_vec", "label")
print("-----测试数据加载成功-----")

# 定义评估器，使用多分类的准确率评估
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# 设置随机森林模型，直接指定基础参数
rf = RandomForestClassifier(featuresCol="features_vec", labelCol="label", numTrees=20, maxDepth=10)

print("-----开始训练随机森林模型-----")
rf_model = rf.fit(train_data)

# 在测试数据上评估模型
predictions = rf_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"随机森林模型准确率: {accuracy:.4f}")

# 保存模型
best_model_save_path = os.path.join(MODEL_SAVE_DIR, "RandomForest")
rf_model.save(best_model_save_path)
print(f"随机森林模型已保存至：{best_model_save_path}")

spark.stop()

Spark 配置成功
-----加载训练数据-----


                                                                                

-----训练数据加载成功-----
-----加载测试数据-----


                                                                                

-----测试数据加载成功-----
-----开始训练随机森林模型-----


                                                                                

随机森林模型准确率: 0.7387


                                                                                

随机森林模型已保存至：hdfs:///chn/model/RandomForest
