In [1]:
from pyspark.context import SparkContext 
from pyspark.sql.session import SparkSession
sc = SparkContext("local")
spark = SparkSession(sc)

In [None]:
sql1 = "select * from active_cust_info_table"
df = spark.sql(sql1)

In [None]:
#查看各个特征变量空值占比
import pyspark.sql.functions as fn
df.agg(*[(1-(fn.count(c)/fn.count("*"))).alias(c+"_missing") for c in df.columns]).show(vertical=True)

In [None]:
#空值处理
df = df.withColumn("Feature1",fn.when(df["fearture1"].isNull()==True,fn.lit('99')).otherwise(df["Feature1"]))

In [None]:
#查看所有特征
for f in df.dtypes:
    print(f)

In [None]:
#特征分类 字符型 数值型
cFlist = []
dFlist = []
for f in df.dtypes:
    if f[1] == "string":
        cFlist.append(f[0])
    else:
        dFlist.append(f[1])
prrint(len(cFlist), len(dFlist))

In [None]:
#字符型特征 分类 唯一值 二值 多值
uniqueFlist = []
binaryFlist = []
moreVFlist = []
for i in range(len(cFlist)):
    l = df.select(df[cFlist[i]]).distinct().count()
    if l==1:
        uniqueFlist.append(cFlist[i])
    elif l==2:
        binaryFlist.append(cFlist[i])
    else:
        moreVFlist.append(cFlist[i])

In [None]:
print(len(uniqueFlist),uniqueFlist)

In [None]:
print(len(binaryFlist),binaryFlist)

In [None]:
print(len(moreVFlist),moreVFlist)

In [None]:
#将数值型变量转化化double，二值型字符变量如果为（1，0）也转化为double
import pyspark.sql.types
df = df.select(cFlist + [fn.col(column).cast("double").alias(column) for column in dFlist])
for f in binaryFlist:
    df = df.withColumn(f,fn.col(f).cast("double"))

In [None]:
#特征分类 字符型 数值型
cFlist = []
dFlist = []
for f in df.dtypes:
    if f[1] == "string":
        cFlist.append(f[0])
    else:
        dFlist.append(f[1])
prrint(len(cFlist), len(dFlist))

In [None]:
#查看数值型 唯一值变量
for i in range(len(dFlist)):
    l = df.select(df[dFlist[i]]).distinct().count()
    if l==1:
        uniqueFlist.append(dFlist[i])
print(len(uniqueFlist),uniqueFlist)

In [None]:
#剔除唯一值变量
for f in uniqueFlist:
    if f in dFlist:
        dFlist.remove(f)
print(len(dFlist),dFlist)

In [4]:
#变量相关性分析
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

In [None]:
df_d = df[dFlist]
assembler = VectorAssembler(inputCols=dFlist,outputCol="features")
df_d = assembler.transform(df_d)
df_d.columns

In [5]:
import pandas as pd
pd.set_option("max_columns",500)
pd.set_option("max_rows",1000)

In [None]:
r1 = Correlation.corr(df_d,"features").head()
r_pd = pd.DataFrame(r1[0].toArray(),index=dFlist,columns=dFlist)
r_pd.head()

In [None]:
r_pd["label"]

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
plt.subplots(figsize = (16,5))
sns.heatmap(r_pd[r_pd.abs()>0.5],annot=True)

In [None]:
#剔除强相关变量
str_corr_feature = []
row = r_pd.shape[0]
for i in range(row):
    for j in range(i+1,row):
        if r_pd.iloc[i,j] >= 0.8 or r_pd.iloc[i,j] <= -0.8:
            str_corr_feature.append(r_pd.columns[j])
            
for n in set(str_corr_feature):
    dFlist.remove(n)
print(dFlist,len(dFlist))

In [6]:
#对多值字符型变量 卡方检验
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler

In [None]:
df_c = df.select(["label"] + moreVFlist)
IndexFlist = []
for f in moveVFlist:
    categoryIndexer = StringIndexer(inputCol = f, outputCol = f+"_Index")
    categoryTransformer = categoryIndexer.fit(df_c)
    df_c = categoryTransformer.transform(df_c)
    IndexFlist.append(f+"_Index")

In [None]:
assembler = VectorAssembler(inputCols = IndexFlist, outputCol = "features")
df_c = assembler.transform(df_c)
df_c.select("features").show()

In [None]:
from pyspark.ml.stat import ChiSquareTest
ChiSqResult = ChiSquareTest.test(df_c,"features","label")

In [None]:
ChiSqResult.select("pValues").show(truncate=False)

In [None]:
modelFeature=dFlist + moreVFlist
print(len(modelFeature),modelFeature)

In [None]:
#y由于抽样 回导致 数值型变量 一些频数较少的值在训练集中缺失 导致预测时保存 建议处理训练集，使包括多值字符型的所有值
df_m = df.select(modelFeature)
train_df,test_df = df_m.randomSplit([0.7,0.3])
train_df.cache()
test_df.cache()

In [None]:
train_df.groupby("label").agg({"id":"count"}).show()

In [8]:
#建模
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier,RandomForestClassifier

In [None]:
stages = []
MFeatures = []
tempIndex = []
for f in moreVFlist:
    stages.append(StringIndexer(inputCol=f,outputCol=f+"_Index"))
    tempIndex.append(f+"_Index")
for i in range(len(tempIndex)):
    stages.append(OneHotEncoder(droplast=False,inputCol=tempIndex[i],outputCol=moreVFlist[i] + "_vector"))
    MFeatures.append(moreVFlist[i] + "_vector")
stages.append(VectorAssembler(inputCols=dFlist+MFeatures,outputCol="features"))
stages.append(RandomForestClassifier(labelCol="label", featuresCol="features",numTrees=200,maxBins=20,subsamplingRate=0.8,seed=2021))
pipeline = Pipeline(stages = stages)
pipeline.getStages()

In [None]:
pipelineModel = pipeline.fit(train_df)

In [None]:
predicted=pipelineModel.transform(test_df)

In [9]:
#评估
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="label",metricName = "areaUnderROC")
auc = evaluator.evaluate(predicted)
print(auc)

In [None]:
#取输入模型vector的features的各个分量的名称 dFlist中的第一个值是label
MoreVFeature_vector = []
for f in MFeature:
    l = len(predicted.select(f).head(1)[0][f].toArray().tolist())
    for i in range(l):
        MoreVFeature_vector.append(f+str(i+1))
Features_name =dFlist[1:] + MoreVFeature_vector        

In [None]:
#特征变量重要性
fimp = pipelineModel.stages[-1].featureImportances
Fimp = pd.DataFrame(list(zip(Features_name,fimp.toArray().tolist())),columns = ["Feature","Importances"])

In [None]:
Fimp.sort_values("Importances", ascending=False)

In [None]:
TP = predicted.filter(predicted["prediction"]==1).filter(predicted["label"]==1).count()
FN = predicted.filter(predicted["prediction"]==0).filter(predicted["label"]==1).count()
TN = predicted.filter(predicted["prediction"]==0).filter(predicted["label"]==0).count()
FP = predicted.filter(predicted["prediction"]==1).filter(predicted["label"]==0).count()

In [16]:
print("TP = %d  FN = %d\nFP = %d  TN = %d\n"%(TP,FN,FP,TN))

TP = 1  FN = 2
FP = 1  TN = 3



In [17]:
precision = TP/(TP+FP)
recall = TP/(TP+FN)
F1 = 2*precision*recall/(precision+recall)
print("命中率：",precision)
print("召回率：",recall)
print("F1:",F1)

命中率： 0.5
召回率： 0.3333333333333333
F1: 0.4


In [None]:
def extract(row):
    return (row.label,) + tuple(row.probability.toArray().tolist())

rd=predicted.select("label","probability").rdd.map(extract)

In [None]:
r = rd.toDF()
r = r.select(fn.col("_1").alias("label"), fn.round("_2",2).alias("np"),fn.round("_3",2).alias("pp"))

In [None]:
r.groupby("pp").agg({"np":"count","lable":"sum"}).show(1000,truncate=False)

In [None]:
#save model
pipelineModel.save("./ActiveCustLostModel")