In [43]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan
from pyspark.ml.feature import Imputer, StandardScaler
from scipy.io import arff
import pandas as pd

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("数据预处理") \
    .getOrCreate()

# 加载 ARFF 文件
def load_arff(path):
    data, meta = arff.loadarff(path)
    df = pd.DataFrame(data)
    # 将 byte 字符串转换为普通字符串，适用于类别标签
    for col in df.select_dtypes([object]):
        df[col] = df[col].apply(lambda x: x.decode('utf-8'))
    return df

# 假设你的文件路径如下，修改为你实际的文件路径
path_test = 'ElectricDevices_TEST.arff'
path_train = 'ElectricDevices_TRAIN.arff'

df_train = load_arff(path_train)
df_test = load_arff(path_test)
# 检查是否有任何列全部都是空值或 NaN
all_null_columns = df_train.isnull().all() | df_train.isna().all()
if all_null_columns.any():
    # 删除所有值都是空值或 NaN 的列
    df_train = df_train.dropna(axis=1, how='all')
    df_test = df_test.dropna(axis=1, how='all')
    print("已删除所有值都是空值或 NaN 的列。")
else:
    print("没有所有值都是空值或 NaN 的列。")


df_train_spark = spark.createDataFrame(df_train)
df_test_spark = spark.createDataFrame(df_test)

# 检查转换后的数据
print("转换后的训练集样本数据:")
df_train_spark.show(5)

print("转换后的测试集样本数据:")
df_test_spark.show(5)



没有所有值都是空值或 NaN 的列。
转换后的训练集样本数据:
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-------

In [44]:
from pyspark.sql.functions import mean

# 填充训练数据集的空缺值
mean_values_train = df_train_spark.agg(*(mean(c).alias(c) for c in df_train_spark.columns)).collect()[0].asDict()
df_train_spark_filled = df_train_spark.fillna(mean_values_train)

# 填充测试数据集的空缺值
mean_values_test = df_test_spark.agg(*(mean(c).alias(c) for c in df_test_spark.columns)).collect()[0].asDict()
df_test_spark_filled = df_test_spark.fillna(mean_values_test)


In [45]:
from pyspark.sql.functions import count, when

# 检查训练数据集中是否有缺失值
missing_values_train = df_train_spark_filled.select([count(when(df_train_spark_filled[c].isNull(), c)).alias(c) for c in df_train_spark_filled.columns])
missing_values_train.show()

# 检查测试数据集中是否有缺失值
missing_values_test = df_test_spark_filled.select([count(when(df_test_spark_filled[c].isNull(), c)).alias(c) for c in df_test_spark_filled.columns])
missing_values_test.show()


+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+
|att1|att2|att3|att4|att5|att6|att7|att8|att9|att10|att11|att12|att13|att14|att15|att16|att17|att18|att19|att20|att21|att22|att23|att24|att25|att26|att27|att28|att29|att30|att31|att32|att33|att34|att35|att36|att37|att38|att39|att40|att41|att42|att43|att44|att45|att46|att47|att48|att49|att50|att51|att52|att53|att54|att55|att56|att57|att58|att59|att60|att61|att62|att63|att64|att65|att66|att67|att68|att69|att70|att71|att72|

In [97]:
df_train_spark_filled .summary().show()


+-------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------

In [120]:
from pyspark.sql.functions import col, when, avg, lit

def remove_outliers_and_fill(df, feature_columns):
    # 设置边界值
    boundaries = [-float("inf"), -0.45, -0.12, float("inf")]

    # 一次性计算所有特征列的平均值
    avg_cols = df.select([avg(c).alias(c) for c in feature_columns]).first().asDict()

    # 更新 DataFrame，替换不在指定范围的值为平均值
    for column in feature_columns:
        df = df.withColumn(column, when(
            (col(column) > boundaries[1]) & (col(column) < boundaries[2]),
            col(column)
        ).otherwise(lit(avg_cols[column])))

    return df

# 定义要处理的特征列，假设 df_test_spark_filled 和 df_train_spark_filled 已经定义了特征列名列表
feature_columns = [c for c in df_test_spark_filled.columns if c != 'target']

# 应用这个函数处理测试和训练数据
df_test_processed = remove_outliers_and_fill(df_test_spark_filled, feature_columns)
df_train_processed = remove_outliers_and_fill(df_train_spark_filled, feature_columns)


In [121]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
# 创建测试数据的assembler，并指定不同的输出列名
from pyspark.ml.feature import VectorAssembler

# 创建 VectorAssembler 实例，将特征列组合成一个名为“features”的向量列
assembler = VectorAssembler(inputCols=df_train_processed.columns[:-1], outputCol="features")

# 将训练数据集和测试数据集分别转换为包含特征向量的 DataFrame
df_train_assembled = assembler.transform(df_train_processed)
df_test_assembled = assembler.transform(df_test_processed)


# 创建训练数据的MinMaxScaler对象
scaler_train = MinMaxScaler(inputCol="features", outputCol="scaled_features")
# 对训练数据拟合归一化模型
scaler_model_train = scaler_train.fit(df_train_assembled)
# 对训练数据进行归一化处理
df_train_normalized = scaler_model_train.transform(df_train_assembled)
df_test_normalized = scaler_model_train.transform(df_test_assembled)

df_train_final = df_train_normalized.drop("features").withColumnRenamed("scaled_features", "features")
df_test_final = df_test_normalized.drop("features").withColumnRenamed("scaled_features", "features")
# 确定特征列和目标列
X_train = df_train_final.select("features")
y_train = df_train_final.select("target")

X_test = df_test_final.select("features")
y_test = df_test_final.select("target")



In [122]:
df_train = df_train_final.select(
    col("features"),
    col("target").alias("label")  # 重命名 target 列为 label 以符合 Spark ML 的习惯
)
df_train.printSchema()
# 检查 DataFrame 的前几行，确保数据处理符合预期
df_train.show()


root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.79960092448320...|  1.0|
|[0.82720640194582...|  1.0|
|[0.71644085670458...|  1.0|
|[0.84398286183301...|  1.0|
|[0.85067233584709...|  1.0|
|[0.79125143046228...|  1.0|
|[0.70407647147309...|  1.0|
|[0.86122854943323...|  1.0|
|[0.78908637286007...|  1.0|
|[0.87209653034381...|  1.0|
|[0.80417249107883...|  1.0|
|[0.78367168214238...|  1.0|
|[0.77335370585131...|  1.0|
|[0.85056639196150...|  1.0|
|[0.78705245644422...|  1.0|
|[0.84739650481600...|  1.0|
|[0.77490226343016...|  1.0|
|[0.78623592441937...|  1.0|
|[0.79812301637584...|  1.0|
|[0.72801590844168...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [127]:
df_test = df_test_final.select(
    col("features"),
    col("target").alias("label")  # 重命名 target 列为 label 以符合 Spark ML 的习惯
)
df_test.printSchema()
# 检查 DataFrame 的前几行，确保数据处理符合预期
df_test.show()


root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.76896698010471...|  6.0|
|[0.63262326369448...|  6.0|
|[0.64814146559084...|  6.0|
|[0.72620337047706...|  6.0|
|[0.84488429451029...|  6.0|
|[0.85396726952497...|  6.0|
|[0.45016102773210...|  6.0|
|[0.64358220958981...|  6.0|
|[0.90611613081165...|  6.0|
|[0.73568234639205...|  6.0|
|[0.60742411329531...|  6.0|
|[0.75435284887019...|  6.0|
|[0.88551801966317...|  6.0|
|[0.45019228936519...|  6.0|
|[0.75277084649861...|  6.0|
|[0.97474775637139...|  6.0|
|[0.59027454536231...|  6.0|
|[1.10677802164055...|  6.0|
|[0.84087777207831...|  6.0|
|[0.44808059788737...|  6.0|
+--------------------+-----+
only showing top 20 rows



In [146]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 示例数据加载和模型定义的代码省略

# 创建随机森林模型
rf = RandomForestClassifier(featuresCol='features', labelCol='label')

# 创建超参数网格
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [23,25,28 ]) \
    .addGrid(rf.maxDepth, [11,12,13]) \
    .addGrid(rf.featureSubsetStrategy, ["5"]) \
    .build()


# 设置多分类评估器
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# 设置交叉验证
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)  # 使用3折交叉验证

# 训练模型
cvModel = crossval.fit(df_train)

# 查看最佳模型的参数
bestModel = cvModel.bestModel
print("Best Param (numTrees): ", bestModel._java_obj.getNumTrees())
print("Best Param (maxDepth): ", bestModel._java_obj.getMaxDepth())
print("Best Param (impurity): ", bestModel._java_obj.getImpurity())

# 查看特征的重要性
print("Feature Importances:\n" + str(bestModel.featureImportances))


Best Param (numTrees):  25
Best Param (maxDepth):  13
Best Param (impurity):  gini
Feature Importances:
(96,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95],[0.007021294958666777,0.004716004879397302,0.0037813351282740453,0.0026145323832795836,0.0034719797355424855,0.0037225823284295356,0.005120335327129577,0.003320580848825234,0.00449876919726233,0.004756364240303749,0.0039933182894707955,0.00445308727459533,0.005602848181327705,0.0034874637247473157,0.0044340815495583765,0.004531003903689309,0.004682721016959163,0.004917594088748012,0.0045723907703785335,0.005632553293879389,0.004617621518544108,0.005833560997834278,0.009394770468334614,0.004787525018602096,0.004312469288193502,0.0071041404248564765,0.008269296393854705,0.009523592494111192,0.00883298157563

In [147]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



# 使用模型进行预测
predictions = bestModel.transform(df_test)

# 评估模型
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % (accuracy))


Test Accuracy = 0.495137
