In [118]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [119]:
spark = SparkSession\
     .builder\
     .appName("LrModel")\
     .master("local[*]") \
     .enableHiveSupport()\
     .getOrCreate()
data=spark.sql("select * from  ml.adult")
#OneHotEncoder不能处理空字符串。所以我们需要将数据集中的空字符串提前处理一下
df=data.na.replace('','NA')
cols = df.columns #和pandas一样看列名
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_per_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- salary: string (nullable = true)



In [120]:
#找到所有的string类型的变量
#dtypes用来看数据变量类型
cat_features = [item[0] for item in df.dtypes if item[1]=='string']
# 需要删除 salary列，否则标签泄露
cat_features.remove('salary')
#找到所有数字变量
num_features = [item[0] for item in df.dtypes if item[1]!='string']

In [121]:
stages = []
for col in cat_features:
    # 字符串转成索引
    string_index = StringIndexer(inputCol = col, outputCol = col + 'Index')
    # 转换为OneHot编码
    encoder = OneHotEncoder(inputCol=string_index.getOutputCol(), outputCol=col + "_one_hot")
    # 将每个字段的转换方式 放到stages中
    stages += [string_index, encoder]

In [122]:
# 将salary转换为索引
label_string_index = StringIndexer(inputCol = 'salary', outputCol = 'label')
# 添加到stages中
stages += [label_string_index]

In [123]:
# 类别变量 + 数值变量
assembler_cols = [c + "_one_hot" for c in cat_features] + num_features
assembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")
stages += [assembler]


# 使用pipeline完成数据处理
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df)
df = pipeline_model.transform(df)
selected_cols = ["label", "features"] + cols
df = df.select(selected_cols)

In [124]:
# df2.show(20)
pd.DataFrame(df.take(2), columns = df.columns)

Unnamed: 0,label,features,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,salary
0,1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",49,Private,101320,Assoc-acdm,12.0,Married-civ-spouse,,Wife,White,Female,0,1902,40,United-States,>=50k
1,1.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...",44,Private,236746,Masters,14.0,Divorced,Exec-managerial,Not-in-family,White,Male,10520,0,45,United-States,>=50k


In [125]:
train, test = df.randomSplit([0.7, 0.3], seed=2021)
print(train.count())
print(test.count())

22777
9784


In [126]:
from pyspark.ml.classification import LogisticRegression
# 创建模型
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label',maxIter=10)
lr_model = lr.fit(train)

In [127]:
#结果预测

predictions = lr_model.transform(test)

In [128]:
predictions.printSchema()


root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: string (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: string (nullable = true)
 |-- capital_loss: string (nullable = true)
 |-- hours_per_week: string (nullable = true)
 |-- native_country: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [129]:
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)
selected.show(4)

DataFrame[label: double, prediction: double, probability: vector, age: string, occupation: string]

+-----+----------+--------------------+---+-----------------+
|label|prediction|         probability|age|       occupation|
+-----+----------+--------------------+---+-----------------+
|  0.0|       1.0|[0.00946534730918...| 36|            Sales|
|  0.0|       1.0|[0.08593020314566...| 36|     Craft-repair|
|  0.0|       0.0|[0.97491344948452...| 36|     Adm-clerical|
|  0.0|       0.0|[0.99997257230198...| 36| Transport-moving|
+-----+----------+--------------------+---+-----------------+
only showing top 4 rows



In [130]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# 模型评估，通过原始数据 rawPrediction计算AUC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('AUC：', evaluator.evaluate(predictions))

AUC： 0.8894730490646457


In [117]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# 创建网络参数，用于交叉验证
param_grid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())
# 五折交叉验证，设置模型，网格参数，验证方法，折数
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
# 交叉验证运行
cv_model = cv.fit(train)
# 对于测试数据，使用五折交叉验证
predictions = cv_model.transform(test)
print('AUC：', evaluator.evaluate(predictions))