In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('imbalanced_binary_classification').getOrCreate()
new_df = spark.read.csv('application_train.csv', header=True, inferSchema=True)
new_df.printSchema()

In [None]:
import pandas as pd
pd.DataFrame(new_df.take(10), columns= new_df.columns)

In [None]:
# 删除SK_ID_CURR列
drop_col = ['SK_ID_CURR']
new_df = new_df.select([column for column in new_df.columns if column not in drop_col])
new_df = new_df.withColumnRenamed('TARGET', 'label')
new_df.groupby('label').count().toPandas()

In [None]:
# 目标变量的分布情况
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline                    # 在Notebook中内联显示matplotlib绘图，而不是弹出一个新的窗口显示绘图结果。
df_pd = new_df.toPandas()
print(len(df_pd))
plt.figure(figsize=(12,10))
sns.countplot(x='label', data=df_pd, order=df_pd['label'].value_counts().index)

## 数据处理

In [None]:
# 查看有多少个categorical features和numerical features：

cat_cols = [item[0] for item in new_df.dtypes if item[1].startswith('string')] 
print(str(len(cat_cols)) + '  categorical features')

num_cols = [item[0] for item in new_df.dtypes if item[1].startswith('int') | item[1].startswith('double')][1:]
print(str(len(num_cols)) + '  numerical features')

In [None]:
# 查找有关缺失值的更多信息

def info_missing_table(df_pd):
    """输入 pandas 数据框并返回缺失值和百分比的列"""
    mis_val = df_pd.isnull().sum()         #计算数据框中每列中空值的总数
    mis_val_percent = 100 * df_pd.isnull().sum() / len(df_pd) #计算每列中空值的百分比
    mis_val_table = pd.concat([mis_val, mis_val_percent], axis=1)  #连接两个表
    mis_val_table_ren_columns = mis_val_table.rename(
    columns = {0 : 'Missing Values', 1 : '% of Total Values'})
    
    # 从表格中筛选出具有缺失值的列（百分比不为0）      
    mis_val_table_ren_columns = mis_val_table_ren_columns[
    mis_val_table_ren_columns.iloc[:,1] != 0].sort_values('% of Total Values', ascending=False).round(1)   
    
    print ("Your selected dataframe has " + str(df_pd.shape[1]) + " columns.\n"    # 原表格有121列  
    "There are " + str(mis_val_table_ren_columns.shape[0]) +              
    " columns that have missing values.") # 含缺失值的有67列
    return mis_val_table_ren_columns

In [None]:
missings = info_missing_table(df_pd)
missings

In [None]:
# 处理 Spark DataFrame，查找更多关于数据集中缺失值的情况

def count_missings(spark_df):
    null_counts = []        
    for col in spark_df.dtypes:    
        cname = col[0]     
        ctype = col[1]      
        nulls = spark_df.where( spark_df[cname].isNull()).count()  # 统计当前列中的空值数量
        result = tuple([cname, nulls])  #new tuple, (列名, 空值数量)
        null_counts.append(result)      # 将包含列名和空值数量的元组 `result` 添加到 `null_counts` 列表中
    null_counts=[(x,y) for (x,y) in null_counts if y!=0]  # 只返回有缺失值的列的信息
    return null_counts

In [None]:
miss_counts = count_missings(new_df)
miss_counts

In [None]:
# 根据缺失值的列，将`new_df` 中的列分为 categorical type 和 numerical type

list_cols_miss=[x[0] for x in miss_counts]       # 从之前计算的包含缺失值列名和缺失值数量的列表 `miss_counts` 中提取出列名
df_miss= new_df.select(*list_cols_miss)         # 创建一个新的 DataFrame `df_miss`，该 DataFrame 只包含存在缺失值的列。

#categorical columns
catcolums_miss=[item[0] for item in df_miss.dtypes if item[1].startswith('string')] 
print("cateogrical columns_miss:", catcolums_miss)

### numerical columns
numcolumns_miss = [item[0] for item in df_miss.dtypes if item[1].startswith('int') | item[1].startswith('double')] 
print("numerical columns_miss:", numcolumns_miss)

In [None]:
# 将每个 categotical 列中的缺失值填充为该列中的众数

from pyspark.sql.functions import rank,sum,col
df_Nomiss=new_df.na.drop()
for x in catcolums_miss:
    mode=df_Nomiss.groupBy(x).count().sort(col("count").desc()).collect()[0][0]   # 计算每列的众数
    print(x, mode)  # 列名和众数 
    new_df = new_df.na.fill({x:mode})  # 填充缺失值

In [None]:
# 将每个 numerical 列中的缺失值填充为该列中的平均数

from pyspark.sql.functions import mean, round

for i in numcolumns_miss:
    meanvalue = new_df.select(round(mean(i))).collect()[0][0] 
    print(i, meanvalue) 
    new_df=new_df.na.fill({i:meanvalue}) 

## 处理不平衡类别：
### 在这种情况下，我们在数据集中添加一个名为“权重”的新列，并用每个类别（1,0）的比例（0.91,0.09）填充它

In [None]:
# 增加权重并填充比例

from pyspark.sql.functions import when

ratio = 0.91
def weight_balance(labels):
    return when(labels == 1, ratio).otherwise(1*(1-ratio))

new_df = new_df.withColumn('weights', weight_balance(col('label')))

In [None]:
# 查看数据集中是否仍存在缺失值

miss_counts2 = count_missings(new_df)
miss_counts2

In [None]:
pd.DataFrame(new_df.take(10), columns= new_df.columns)

## 特征工程

In [None]:
# 使用 spark 中 MLlib 的 OneHotEncoderEstimator 将每个 categorical 特征转换为 one-hot 编码向量
# 接下来，使用 VectorAssembler 将生成的 one-hot 向量和其余 numerical 特征组合成一个向量列。我们将流程的每一步附加到阶段数组中

from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

stages = []
for categoricalCol in cat_cols:
    # 对 categorical 特征进行字符串索引
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    
    # 将索引后的特征进行 one-hot 编码
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

# 组合所有的特征列
assemblerInputs = [c + "classVec" for c in cat_cols] + num_cols            # 包含所有要组合的特征列的列表
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")    # 将所有特征组合成一个向量
stages += [assembler]

In [None]:
# 使用 pipeline 将 new_df 转换成一个包含所有预处理特征的 DataFrame

from pyspark.ml import Pipeline
cols = new_df.columns
pipeline = Pipeline(stages = stages) 
pipelineModel = pipeline.fit(new_df)         # 拟合模型
new_df = pipelineModel.transform(new_df)

In [None]:
selectedCols = ['features']+cols
new_df = new_df.select(selectedCols)
pd.DataFrame(new_df.take(5), columns=new_df.columns)

In [None]:
# 分割训练集和测试集

train, test = new_df.randomSplit([0.80, 0.20], seed = 42)
print(train.count())
print(test.count())

## 训练模型

In [None]:
# 使用逻辑回归进行训练
from pyspark.ml.classification import LogisticRegression

LR = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=15)   # 初始化回归模型指定特征列，目标列以及最大迭代次数（15）
LR_model = LR.fit(train) # 拟合模型

In [None]:
# 绘制ROC曲线

trainingSummary = LR_model.summary        # 获取模型训练后的摘要信息，其中包含模型性能的各种指标，如 ROC 曲线、精度、召回率等。

roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])           # 绘制曲线
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()

print('Training set ROC: ' + str(trainingSummary.areaUnderROC))     # 获取 ROC 曲线下面积（AUC）

In [None]:
# 检查模型在测试集上的性能：
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions_LR = LR_model.transform(test)      # 将训练好的逻辑回归模型 LR_model 应用于测试集
evaluator = BinaryClassificationEvaluator()    # 创建一个 BinaryClassificationEvaluator 实例，用于评估二分类模型的性能。
print("Test_SET Area Under ROC: " + str(evaluator.evaluate(predictions_LR, {evaluator.metricName: "areaUnderROC"})))   # 打印 AUC 值

In [None]:
# GBT

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(maxIter=15)                # 初始化 GBT 分类器并设置最大迭代次数
GBT_Model = gbt.fit(train)                     # 拟合模型
predictions = GBT_Model.transform(test)        # 将训练好的逻辑回归模型 GBT_Model 应用于测试集

In [None]:
evaluator = BinaryClassificationEvaluator()
print("Test_SET Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [None]:
# 使用网格搜索实现超参数调整，然后运行交叉验证以更好地提高 GBT 的性能
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# 创建参数网格
paramGrid = (ParamGridBuilder()                     # 初始化参数网格构建器
             .addGrid(gbt.maxDepth, [2, 4, 6])      # 设置最大树深度为 2、4、6，控制模型的复杂度
             .addGrid(gbt.maxBins, [20, 30])        # 设置树分裂时每个特征的最大划分数为 20 和 30。这个参数影响连续特征的分箱处理。
             .addGrid(gbt.maxIter, [10, 15])        # 设置最大迭代次数为 10 和 15，表示模型将构建的决策树数目。
             .build())

# 初始化交叉验证器
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# 执行交叉验证
cvModel = cv.fit(train)             # 验证并保存最佳模型
predictions = cvModel.transform(test)     # 生成预测
evaluator.evaluate(predictions)     # 评估性能