In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler, MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col,when,regexp_replace,rand
from pyspark.sql.types import IntegerType
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import time
import datetime
import pandas as pd

# Create a SparkSession
spark = SparkSession.builder \
    .appName("GradientBoostingClassifierExample") \
    .getOrCreate()

df = spark.read.option("header", "true").option("index", False).csv("D:/project/homecredit/final/v3/final_all_onehotencoding_v3.csv")

df = df.drop('_c0','SK_ID_CURR')

all_columns = df.columns


columns_to_convert = all_columns

for column in columns_to_convert:
    df = df.withColumn(column, col(column).cast(IntegerType()))

df= df.fillna(0)
df= df.drop('_c0','SK_ID_CURR')

# Define the split ratios
train_ratio = 0.9
validation_ratio = 0.05
test_ratio = 0.05

# Seed for reproducibility
seed = 42

# Split the DataFrame into train, validation, and test sets
splits = df.randomSplit([train_ratio, validation_ratio, test_ratio], seed=seed)

# Get the individual DataFrames
train_df = splits[0]
validation_df = splits[1]
test_df = splits[2]


feature_columns =  df.columns
feature_columns.remove("TARGET")

# Assemble the features into a single "features" column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
train_df = assembler.transform(train_df)
validation_df = assembler.transform(validation_df)

# Define GBTClassifier parameters
params = {
    'maxDepth': 8,
    'minInstancesPerNode': 2,
    'minInfoGain': 0.0,
    'subsamplingRate': 0.8715623,
    'maxBins': 32
}

# Create a GBTClassifier
gbt = GBTClassifier(**params, seed=37, labelCol="TARGET")  # Specify the label column

# Fit the GBTClassifier model to the training data
model = gbt.fit(train_df)

# Make predictions on the validation set
predictions = model.transform(validation_df)

# Evaluate the model's performance on the validation set
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
auc_valid = evaluator.evaluate(predictions)
print(f'Valid AUC: {auc_valid:.4f}')

# If you want to perform iterations and plot the AUC curve, you can use PySpark's CrossValidator with a ParamGridBuilder:
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [10]) \
    .build()

crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cvModel = crossval.fit(train_df)

# Get the AUC for each iteration
iteration_aucs = cvModel.avgMetrics
for iteration, auc_iteration in enumerate(iteration_aucs, start=1):
    if iteration % 10 == 0:  # Print every 10 iterations
        print(f'Iteration {iteration}, valid_AUC = {auc_iteration:.4f}')

# Stop the SparkSession when you're done
spark.stop()


IllegalArgumentException: label does not exist. Available: TARGET, CNT_CHILDREN, AMT_INCOME_TOTAL, AMT_CREDIT, AMT_ANNUITY, AMT_GOODS_PRICE, REGION_POPULATION_RELATIVE, AGE, JOB_TENURE, DAYS_REGISTRATION, DAYS_ID_PUBLISH, OWN_CAR_AGE, CNT_FAM_MEMBERS, REGION_RATING_CLIENT, REGION_RATING_CLIENT_W_CITY, HOUR_APPR_PROCESS_START, EXT_SOURCE_1, EXT_SOURCE_2, EXT_SOURCE_3, DEF_30_CNT_SOCIAL_CIRCLE, DEF_60_CNT_SOCIAL_CIRCLE, DAYS_LAST_PHONE_CHANGE, AMT_REQ_CREDIT_BUREAU_MON, AMT_REQ_CREDIT_BUREAU_QRT, AMT_REQ_CREDIT_BUREAU_YEAR, DAYS_EMPLOYED_365243, CNT_INSTALMENT_median, Contract_Change_rateCNT_INSTALMENT_mean, DAYS_CREDIT_ENDDATE_max, AMT_PAYMENT_MAXANNUITY_max, Contract_Change_rateCNT_INSTALMENT, DAYS_CREDIT_UPDATE_max, countSK_ID_BUREAUCREDIT_ACTIVE_Active_count, 3_DAYS_ENTRY_DIFF_MEAN_mean, AMT_PAYMENT_MAX_median, AMT_PAYMENT_MAXCREDIT_max, AMT_PAYMENT_MAXCREDIT_median, DAYS_CREDIT_ENDDATE_min, CNT_INSTALMENT_max, AMT_CREDIT_SUM_DEBT_mean, Contract_Change_count, AMT_CREDIT_SUM_OVERDUE_max, AMT_CREDIT_SUM_max, PAYMENT_MAXMEDIAN_median, Contract_Change_Count_mean, AMT_PAYMENT_MEDIAN_median, DAYS_ENTRY_DIFF_MAX_max, 3_DAYS_ENTRY_DIFF_MAX_mean, AMT_PAYMENT_MAX_max, 6_DAYS_ENTRY_DIFF_MAX_mean, MAX_OVERDUECREDIT_mean, CNT_INSTALMENTTotal_Months_rate, PREV_DAYS_TERMINATION_min, CNT_INSTALMENTTotal_Months_rate_mean, 9_DELAY_mean, PREV_NAME_YIELD_GROUP_XNA_rate, MONTHS_BALANCE_finish, PREV_GOODS_PRICECREDIT_min, OUTSTANDING_sum, DAYS_CREDIT_recent, AMT_PAYMENT_ALL_mean, countSK_ID_BUREAUCREDIT_TYPE_Microloan_count, DELAY_mean, DAYS_CREDIT_ENDDATE_median, MONTHS_BALANCE_start, AMT_CREDIT_SUM_OVERDUE_mean, DAYS_CREDIT_UPDATE_median, DAYS_ENTRY_DIFF_MEAN_mean, PREV_DAYS_LAST_DUE_min, 9_DAYS_ENTRY_DIFF_MAX_mean, DAYS_CREDIT_mean, DEBTCREDIT_mean, DAYS_ENTRY_DIFF_MAX_median, DEBTCREDIT_max, AMT_ANNUITY_CASH_LOANS_MEAN, AMT_APPLICATION_CASH_LOANS_MEAN, AMT_CREDIT_CASH_LOANS_MEAN, AMT_GOODS_PRICE_CASH_LOANS_MEAN, AMT_ANNUITY_CONSUMER_LOANS_MEAN, AMT_APPLICATION_CONSUMER_LOANS_MEAN, AMT_CREDIT_CONSUMER_LOANS_MEAN, AMT_GOODS_PRICE_CONSUMER_LOANS_MEAN, AMT_ANNUITY_REVOLVING_LOANS_MEAN, AMT_APPLICATION_REVOLVING_LOANS_MEAN, AMT_CREDIT_REVOLVING_LOANS_MEAN, AMT_GOODS_PRICE_REVOLVING_LOANS_MEAN, creditcard_high, creditcard_normal, creditcard_low, instalments_overdue_DAY_mean, instalments_overdue_AMT_mean, instalments_PERIOD, BUREAU_DAYS_CREDIT, BUREAU_DAYS_CREDIT_ENDDATE, BUREAU_DAYS_ENDDATE_FACT, CDCS_ratio, bureau_annuity_mean, bereau_balance_status, NAME_CONTRACT_TYPE_Revolvingloans, CODE_GENDER_M, FLAG_OWN_CAR_Y, FLAG_OWN_REALTY_Y, NAME_TYPE_SUITE_Family, NAME_TYPE_SUITE_Groupofpeople, NAME_TYPE_SUITE_Other_A, NAME_TYPE_SUITE_Other_B, NAME_TYPE_SUITE_Spousepartner, NAME_TYPE_SUITE_Unaccompanied, NAME_INCOME_TYPE_Commercialassociate, NAME_INCOME_TYPE_Maternityleave, NAME_INCOME_TYPE_Pensioner, NAME_INCOME_TYPE_Stateservant, NAME_INCOME_TYPE_Student, NAME_INCOME_TYPE_Unemployed, NAME_INCOME_TYPE_Working, NAME_EDUCATION_TYPE_Highereducation, NAME_EDUCATION_TYPE_Incompletehigher, NAME_EDUCATION_TYPE_Lowersecondary, NAME_EDUCATION_TYPE_Secondarysecondaryspecial, NAME_FAMILY_STATUS_Married, NAME_FAMILY_STATUS_Separated, NAME_FAMILY_STATUS_Singlenotmarried, NAME_FAMILY_STATUS_Unknown, NAME_FAMILY_STATUS_Widow, NAME_HOUSING_TYPE_Houseapartment, NAME_HOUSING_TYPE_Municipalapartment, NAME_HOUSING_TYPE_Officeapartment, NAME_HOUSING_TYPE_Rentedapartment, NAME_HOUSING_TYPE_Withparents, OCCUPATION_TYPE_Cleaningstaff, OCCUPATION_TYPE_Cookingstaff, OCCUPATION_TYPE_Corestaff, OCCUPATION_TYPE_Drivers, OCCUPATION_TYPE_HRstaff, OCCUPATION_TYPE_Highskilltechstaff, OCCUPATION_TYPE_ITstaff, OCCUPATION_TYPE_Laborers, OCCUPATION_TYPE_LowskillLaborers, OCCUPATION_TYPE_Managers, OCCUPATION_TYPE_Medicinestaff, OCCUPATION_TYPE_Privateservicestaff, OCCUPATION_TYPE_Realtyagents, OCCUPATION_TYPE_Salesstaff, OCCUPATION_TYPE_Secretaries, OCCUPATION_TYPE_Securitystaff, OCCUPATION_TYPE_Waitersbarmenstaff, WEEKDAY_APPR_PROCESS_START_MONDAY, WEEKDAY_APPR_PROCESS_START_SATURDAY, WEEKDAY_APPR_PROCESS_START_SUNDAY, WEEKDAY_APPR_PROCESS_START_THURSDAY, WEEKDAY_APPR_PROCESS_START_TUESDAY, WEEKDAY_APPR_PROCESS_START_WEDNESDAY, ORGANIZATION_TYPE_Agriculture, ORGANIZATION_TYPE_Bank, ORGANIZATION_TYPE_BusinessEntityType1, ORGANIZATION_TYPE_BusinessEntityType2, ORGANIZATION_TYPE_BusinessEntityType3, ORGANIZATION_TYPE_Cleaning, ORGANIZATION_TYPE_Construction, ORGANIZATION_TYPE_Culture, ORGANIZATION_TYPE_Electricity, ORGANIZATION_TYPE_Emergency, ORGANIZATION_TYPE_Government, ORGANIZATION_TYPE_Hotel, ORGANIZATION_TYPE_Housing, ORGANIZATION_TYPE_Industrytype1, ORGANIZATION_TYPE_Industrytype10, ORGANIZATION_TYPE_Industrytype11, ORGANIZATION_TYPE_Industrytype12, ORGANIZATION_TYPE_Industrytype13, ORGANIZATION_TYPE_Industrytype2, ORGANIZATION_TYPE_Industrytype3, ORGANIZATION_TYPE_Industrytype4, ORGANIZATION_TYPE_Industrytype5, ORGANIZATION_TYPE_Industrytype6, ORGANIZATION_TYPE_Industrytype7, ORGANIZATION_TYPE_Industrytype8, ORGANIZATION_TYPE_Industrytype9, ORGANIZATION_TYPE_Insurance, ORGANIZATION_TYPE_Kindergarten, ORGANIZATION_TYPE_LegalServices, ORGANIZATION_TYPE_Medicine, ORGANIZATION_TYPE_Military, ORGANIZATION_TYPE_Mobile, ORGANIZATION_TYPE_Other, ORGANIZATION_TYPE_Police, ORGANIZATION_TYPE_Postal, ORGANIZATION_TYPE_Realtor, ORGANIZATION_TYPE_Religion, ORGANIZATION_TYPE_Restaurant, ORGANIZATION_TYPE_School, ORGANIZATION_TYPE_Security, ORGANIZATION_TYPE_SecurityMinistries, ORGANIZATION_TYPE_Selfemployed, ORGANIZATION_TYPE_Services, ORGANIZATION_TYPE_Telecom, ORGANIZATION_TYPE_Tradetype1, ORGANIZATION_TYPE_Tradetype2, ORGANIZATION_TYPE_Tradetype3, ORGANIZATION_TYPE_Tradetype4, ORGANIZATION_TYPE_Tradetype5, ORGANIZATION_TYPE_Tradetype6, ORGANIZATION_TYPE_Tradetype7, ORGANIZATION_TYPE_Transporttype1, ORGANIZATION_TYPE_Transporttype2, ORGANIZATION_TYPE_Transporttype3, ORGANIZATION_TYPE_Transporttype4, ORGANIZATION_TYPE_University, ORGANIZATION_TYPE_XNA, features, rawPrediction, probability, prediction

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler, MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col,when,regexp_replace,rand
from pyspark.sql.types import IntegerType
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import time
import datetime
import pandas as pd

# 创建一个Spark会话
spark = SparkSession.builder.appName("MySparkSession").getOrCreate()




df = spark.read.option("header", "true").option("index", False).csv("D:/project/homecredit/final/v3/final_all_onehotencoding_v3.csv")


df = df.drop('_c0','SK_ID_CURR')
df.show()


+------+------------+----------------+----------+-----------+---------------+--------------------------+---+----------+-----------------+---------------+-----------+---------------+--------------------+---------------------------+-----------------------+------------------+------------------+------------------+------------------------+------------------------+----------------------+-------------------------+-------------------------+--------------------------+--------------------+---------------------+---------------------------------------+-----------------------+--------------------------+----------------------------------+----------------------+-------------------------------------------+---------------------------+----------------------+-------------------------+----------------------------+-----------------------+------------------+------------------------+---------------------+--------------------------+------------------+------------------------+--------------------------+-------

In [2]:
all_columns = df.columns


columns_to_convert = all_columns

for column in columns_to_convert:
    df = df.withColumn(column, col(column).cast(IntegerType()))

df= df.fillna(0)
df= df.drop('_c0','SK_ID_CURR')

In [6]:
# num_iterations = 3  # 你可以根据需要设置不同的迭代次数
# auc_values = []

# # 循环迭代
# for i in range(num_iterations):
#     # 使用不同的随机种子重新划分数据集
seed = 37
train_ratio = 0.9
valid_ratio = 0.05
test_ratio = 0.05

df_with_rand = df.withColumn("rand", rand(seed))
df_with_rand = df_with_rand.orderBy("rand")

train, valid, test = df_with_rand.randomSplit([train_ratio, valid_ratio, test_ratio], seed=seed)

train = train.drop("rand")
valid = valid.drop("rand")
test = test.drop("rand")

feature_columns = df.columns
feature_columns.remove("TARGET")

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
train = assembler.transform(train)
test = assembler.transform(test)
valid = assembler.transform(valid)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(train)
train_scaled = scaler_model.transform(train)
test_scaled = scaler_model.transform(test)
valid_scaled = scaler_model.transform(valid)


datasets = [train_scaled, test_scaled, valid_scaled]

for dataset in datasets:
    total_count = dataset.count()
    label_0_count = dataset.filter(dataset["TARGET"] == 0).count()
    label_1_count = dataset.filter(dataset["TARGET"] == 1).count()

    weight_label_0 = 1.0
    weight_label_1 = total_count / (2 * label_1_count)

    # 为每个数据集添加 classWeights 列
    dataset = dataset.withColumn("classWeights", when(dataset["TARGET"] == 0, weight_label_0).otherwise(weight_label_1))



gbt = GBTClassifier(labelCol="TARGET", featuresCol="scaled_features",maxIter=100)
param_grid = ParamGridBuilder() \
     .addGrid(gbt.maxDepth, [5, 10,15,20,25,30,35,50]) \
     .build()

pipeline = Pipeline(stages=[gbt])


cross_validator = CrossValidator(estimator=pipeline,
                                 estimatorParamMaps=param_grid,
                                 evaluator=BinaryClassificationEvaluator())
cv_model = cross_validator.fit(train_scaled)
best_model = cv_model.bestModel


print("Best Model Parameters:")
for param, value in best_model.stages[-1].extractParamMap().items():
    print(f"{param.name}: {value}")

evaluator = BinaryClassificationEvaluator(labelCol="TARGET", rawPredictionCol="prediction", metricName="areaUnderROC")

predictions = best_model.transform(test_scaled)

auctest = evaluator.evaluate(predictions)

print(f'test_auc Score:{auctest}')

predictions1 = best_model.transform(valid_scaled)

aucvalid = evaluator.evaluate(predictions1)
print(f'valid_auc Score:{aucvalid}')

# # # 打印每次迭代的AUC值
# # for i, auc in enumerate(auc_values):
# #     print(f"Iteration {i + 1}:  AUC = {auc:.4f}")


In [5]:
print(train_scaled.columns)


['TARGET', 'CNT_CHILDREN', 'AMT_INCOME_TOTAL', 'AMT_CREDIT', 'AMT_ANNUITY', 'AMT_GOODS_PRICE', 'REGION_POPULATION_RELATIVE', 'AGE', 'JOB_TENURE', 'DAYS_REGISTRATION', 'DAYS_ID_PUBLISH', 'OWN_CAR_AGE', 'CNT_FAM_MEMBERS', 'REGION_RATING_CLIENT', 'REGION_RATING_CLIENT_W_CITY', 'HOUR_APPR_PROCESS_START', 'EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3', 'DEF_30_CNT_SOCIAL_CIRCLE', 'DEF_60_CNT_SOCIAL_CIRCLE', 'DAYS_LAST_PHONE_CHANGE', 'AMT_REQ_CREDIT_BUREAU_MON', 'AMT_REQ_CREDIT_BUREAU_QRT', 'AMT_REQ_CREDIT_BUREAU_YEAR', 'DAYS_EMPLOYED_365243', 'CNT_INSTALMENT_median', 'Contract_Change_rateCNT_INSTALMENT_mean', 'DAYS_CREDIT_ENDDATE_max', 'AMT_PAYMENT_MAXANNUITY_max', 'Contract_Change_rateCNT_INSTALMENT', 'DAYS_CREDIT_UPDATE_max', 'countSK_ID_BUREAUCREDIT_ACTIVE_Active_count', '3_DAYS_ENTRY_DIFF_MEAN_mean', 'AMT_PAYMENT_MAX_median', 'AMT_PAYMENT_MAXCREDIT_max', 'AMT_PAYMENT_MAXCREDIT_median', 'DAYS_CREDIT_ENDDATE_min', 'CNT_INSTALMENT_max', 'AMT_CREDIT_SUM_DEBT_mean', 'Contract_Change_count', 

In [None]:
# 將數據按 scaled_features 列分組，計算每組的總數
value_counts = train.groupBy("scaled_features").count()

# 使用 count 方法計算總共有多少種結果
num_distinct_results = value_counts.count()

# 顯示總共有多少種結果
print(f"總共有 {num_distinct_results} 種結果")


In [None]:
train_countrow = train.count()

# 顯示總共有多少種結果
print(f"總共有 {train_countrow} 種結果")

In [None]:
# 使用随机种子以确保可重复性
seed = 42

# 划分数据集
train_ratio = 0.9
valid_ratio = 0.05
test_ratio = 0.05

# 使用随机列生成随机数列，并与原始 DataFrame 进行连接
df_all_with_rand = df_all.withColumn("rand", rand(seed))
df_all_with_rand = df_all_with_rand.orderBy("rand")

# 划分数据集
train, valid, test = df_all_with_rand.randomSplit([train_ratio, valid_ratio, test_ratio], seed=seed)

# 移除生成的随机列
train = train.drop("rand")
valid = valid.drop("rand")
test = test.drop("rand")


# 打印每个数据集的行数
print("训练集行数:", train.count())
print("验证集行数:", valid.count())
print("测试集行数:", test.count())


In [None]:
# 创建VectorAssembler来合并特征列为一个Vector列
assembler = VectorAssembler(inputCols=train.columns, outputCol="features")
train = assembler.transform(train)
test = assembler.transform(test)
valid = assembler.transform(valid)


# 创建 MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

# 估计缩放器的参数
scaler_model = scaler.fit(train)

# 使用缩放器将数据进行转换
train_scaled = scaler_model.transform(train)
test_scaled = scaler_model.transform(test)
valid_scaled = scaler_model.transform(valid)

In [None]:
gbt = GBTClassifier(labelCol="TARGET", featuresCol="scaled_features")

In [None]:
print("start時間:", datetime.datetime.now())
print('------------------') 
# 计时开始
start_time = time.time()

# 训练模型
model = gbt.fit(train_scaled)

# 停止计时
end_time = time.time()
execution_time = end_time - start_time
print("\n程序执行花费的时间：", round(execution_time, 2), "秒")
print("done時間:", datetime.datetime.now())


In [None]:
predictions_valid = model.transform(valid_scaled)
predictions_valid.show()

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# 创建二元分类评估器
evaluator = BinaryClassificationEvaluator(labelCol="TARGET", rawPredictionCol="prediction", metricName="areaUnderROC")

# 计算AUC
auc = evaluator.evaluate(predictions_valid)
print(f'valid_AUC = {auc:.4f}')


In [None]:
# 去除包含 NaN 或缺失值的行
predictions_valid = predictions_valid.dropna(subset=["prediction", "TARGET"])

# 提取预测结果和实际标签
y_pred = predictions_valid.select("prediction").rdd.map(lambda row: row[0])
y_true = predictions_valid.select("TARGET").rdd.map(lambda row: row[0])

# 计算混淆矩阵
from sklearn.metrics import confusion_matrix
import numpy as np

y_true = y_true.collect()
y_pred = y_pred.collect()

confusion_matrix_result = confusion_matrix(y_true, y_pred)

print("Confusion Matrix:")
print(np.array(confusion_matrix_result))



In [None]:
# Make predictions on test data using the Transformer.transform() method.
predictions = model.transform(valid_scaled)
evaluator = BinaryClassificationEvaluator(labelCol="TARGET", rawPredictionCol="prediction", metricName="areaUnderROC")


# AUC Evaluate best model
evaluator.evaluate(predictions)
print('Test Area Under Roc',evaluator.evaluate(predictions))


In [None]:
train.printSchema()

In [None]:
import matplotlib.pyplot as plt
plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(model.summary.roc.select('FPR').collect(),
         model.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()

In [None]:
spark.stop()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructField , StructType ,DoubleType,StringType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import expr,col,column
from mmlspark.lightgbm import LightGBMRegressor


print("start時間:", datetime.datetime.now())
print('------------------') 

# 创建随机森林回归模型
#rforest = RandomForestRegressor(numTrees=50, seed=0, minInstancesPerNode=20, featuresCol="scaled_features", labelCol="label")

LightGBM = LightGBMRegressor(
                          predictionCol="prediction",
                        featuresCol="scaled_features",
                          labelCol="label",
                         )
# 创建一个Pipeline来执行特征向量化和模型训练
pipeline = Pipeline(stages=[rforest])

# 计时开始
start_time = time.time()

# 训练模型
model = pipeline.fit(X_train_scaled)

# 停止计时
end_time = time.time()
execution_time = end_time - start_time
print("\n程序执行花费的时间：", round(execution_time, 2), "秒")
print("done時間:", datetime.datetime.now())
