In [3]:
from pyspark.ml.linalg import Vector, Vectors
from pyspark.sql import Row, functions, SparkSession
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel,BinaryLogisticRegressionSummary

In [25]:
"""
逻辑斯蒂回归分类器
"""

'\n逻辑斯蒂回归分类器\n'

In [3]:
def f(x):
    rel = {}
    rel['features'] = Vectors.dense(float(x[0]), float(x[1]), float(x[2]), float(x[3]))
    rel['label'] = str(x[4])
    return rel

In [4]:
spark = SparkSession.builder \
        .master("local") \
        .appName("example") \
        .config("spark.debug.maxToStringFields", "100") \
        .config("spark.sql.shuffle.partitions", "400") \
        .config("spark.default.parallelism", "600") \
        .config("spark.sql.auto.repartition", "true") \
        .config("spark.sql.execution.arrow.enabled", "true") \
        .enableHiveSupport() \
        .getOrCreate()

In [10]:
data = spark.sparkContext.textFile("file:///home/hadoop/data/sparkMlib/iris.txt") \
.map(lambda line: line.split(',')) \
.map(lambda p: Row(**f(p))) \
.toDF()

data.show()

+-----------------+-----------+
|         features|      label|
+-----------------+-----------+
|[5.1,3.5,1.4,0.2]|Iris-setosa|
|[4.9,3.0,1.4,0.2]|Iris-setosa|
|[4.7,3.2,1.3,0.2]|Iris-setosa|
|[4.6,3.1,1.5,0.2]|Iris-setosa|
|[5.0,3.6,1.4,0.2]|Iris-setosa|
|[5.4,3.9,1.7,0.4]|Iris-setosa|
|[4.6,3.4,1.4,0.3]|Iris-setosa|
|[5.0,3.4,1.5,0.2]|Iris-setosa|
|[4.4,2.9,1.4,0.2]|Iris-setosa|
|[4.9,3.1,1.5,0.1]|Iris-setosa|
|[5.4,3.7,1.5,0.2]|Iris-setosa|
|[4.8,3.4,1.6,0.2]|Iris-setosa|
|[4.8,3.0,1.4,0.1]|Iris-setosa|
|[4.3,3.0,1.1,0.1]|Iris-setosa|
|[5.8,4.0,1.2,0.2]|Iris-setosa|
|[5.7,4.4,1.5,0.4]|Iris-setosa|
|[5.4,3.9,1.3,0.4]|Iris-setosa|
|[5.1,3.5,1.4,0.3]|Iris-setosa|
|[5.7,3.8,1.7,0.3]|Iris-setosa|
|[5.1,3.8,1.5,0.3]|Iris-setosa|
+-----------------+-----------+
only showing top 20 rows



In [12]:
# 获取标签列和特征列，进行索引并重命名
labelIndexer = StringIndexer() \
.setInputCol("label") \
.setOutputCol("indexedLabel") \
.fit(data)

featureIndexer = VectorIndexer() \
.setInputCol("features") \
.setOutputCol("indexedFeatures") \
.fit(data)

In [13]:
# 设置LogisticRegression算法参数
lr = LogisticRegression() \
.setLabelCol("indexedLabel") \
.setFeaturesCol("indexedFeatures") \
# 循环100次
.setMaxIter(100) \
# 规范化项为0.3
.setRegParam(0.3) \
.setElasticNetParam(0.8) \

print("LogisticRegression parameters:\n" + lr.explainParams())

LogisticRegression parameters:
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.8)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: indexedFeatures)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: indexedLabel)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on inte

In [14]:
# 设置一个IndexToString的转换器，把预测类别重新转为字符型。
labelConverter = IndexToString() \
.setInputCol("prediction") \
.setOutputCol("predictedLabel") \
.setLabels(labelIndexer.labels)

lrPipeline = Pipeline() \
.setStages([labelIndexer, featureIndexer, lr, labelConverter])

In [19]:
# 把数据随机分为训练集和预测集，其中训练集占比70%
"""
Pipeline本质上是一个评估器，当Pipeline调用fit()的时候就产生了一个PipelineModel，它是一个转换器。
然后，这个PipelineModel就可以调用transform()来进行预测，生成一个新的DataFrame，即利用训练得到的模型对测试集进行验证。
"""
trainingData, testData = data.randomSplit([0.7, 0.3])
lrPipelineModel = lrPipeline.fit(trainingData)
lrPredictions = lrPipelineModel.transform(testData)

In [21]:
"""
输出预测的结果，其中，select选择要输出的列，collect获取所有行的数据，用foreach把每行打印出来
"""
preRel = lrPredictions.select( \
    "predictedLabel", \
    "label", \
    "features", \
    "probability"). \
    collect()

for item in preRel: \
    print(str(item['label'])+','+ \
    str(item['features'])+'-->prob='+ \
    str(item['probability'])+',predictedLabel '+ \
    str(item['predictedLabel']))

Iris-setosa,[4.4,3.0,1.3,0.2]-->prob=[0.2990995275653928,0.5244681696764993,0.1764323027581078],predictedLabelIris-setosa
Iris-setosa,[4.6,3.1,1.5,0.2]-->prob=[0.3073584871178525,0.5113374296030625,0.18130408327908495],predictedLabelIris-setosa
Iris-setosa,[4.6,3.6,1.0,0.2]-->prob=[0.28675440674555536,0.5440954086320626,0.16915018462238196],predictedLabelIris-setosa
Iris-setosa,[4.9,3.1,1.5,0.1]-->prob=[0.306000662530688,0.5200441493310121,0.17395518813829988],predictedLabelIris-setosa
Iris-setosa,[5.0,3.0,1.6,0.2]-->prob=[0.3114923752684263,0.504765050787903,0.18374257394367055],predictedLabelIris-setosa
Iris-setosa,[5.0,3.4,1.6,0.4]-->prob=[0.3136785574127912,0.48709732127968247,0.19922412130752631],predictedLabelIris-setosa
Iris-setosa,[5.0,3.6,1.4,0.2]-->prob=[0.30322706302175584,0.5179058908716277,0.1788670461066163],predictedLabelIris-setosa
Iris-setosa,[5.1,3.3,1.7,0.5]-->prob=[0.3185030434209542,0.47159425375061087,0.20990270282843493],predictedLabelIris-setosa
Iris-setosa,[5.1

In [22]:
evaluator = MulticlassClassificationEvaluator() \
.setLabelCol("indexedLabel") \
.setPredictionCol("prediction")

lrAccuracy = evaluator.evaluate(lrPredictions)

# 模型预测的准确率
lrAccuracy


0.842822966507177

In [23]:
lrModel = lrPipelineModel.stages[2]

print ("Coefficients: \n " + str(lrModel.coefficientMatrix)+ \
    "\nIntercept: "+str(lrModel.interceptVector)+ \
    "\n numClasses: "+str(lrModel.numClasses)+ \
    "\n numFeatures: "+str(lrModel.numFeatures))


Coefficients: 
 3 X 4 CSRMatrix
(1,2) -0.263
(1,3) -0.2131
(2,3) 0.3695
Intercept: [-0.11478290240322053,0.8313057486567499,-0.7165228462535294]
 numClasses: 3
 numFeatures: 4


In [24]:
"""
决策树分类器
"""

'\n决策树分类器\n'

In [6]:
from pyspark.ml.classification import DecisionTreeClassificationModel
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vector, Vectors
from pyspark.sql import Row, SparkSession
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

In [7]:
spark = SparkSession.builder \
        .master("local") \
        .appName("example") \
        .config("spark.debug.maxToStringFields", "100") \
        .config("spark.sql.shuffle.partitions", "400") \
        .config("spark.default.parallelism", "600") \
        .config("spark.sql.auto.repartition", "true") \
        .config("spark.sql.execution.arrow.enabled", "true") \
        .enableHiveSupport() \
        .getOrCreate()

In [8]:
"""
读取文本，第一个map将每行数据用'，'隔开，每行被分为5个部分，前四个为特征，最后一个为分类
特征存入Vector中，创建一个lris模式的RDD，然后转为dataframe
"""
def f(x):
    rel = {}
    rel['features'] = Vectors \
    .dense(float(x[0]), float(x[1]), float(x[2]), float(x[3])) 
    rel['label'] = str(x[4])
    return rel

In [9]:
data = spark.sparkContext.textFile("file:///home/hadoop/data/sparkMlib/iris.txt") \
.map(lambda line: line.split(',')) \
.map(lambda p: Row(**f(p))) \
.toDF()

data.show(100)

+-----------------+---------------+
|         features|          label|
+-----------------+---------------+
|[5.1,3.5,1.4,0.2]|    Iris-setosa|
|[4.9,3.0,1.4,0.2]|    Iris-setosa|
|[4.7,3.2,1.3,0.2]|    Iris-setosa|
|[4.6,3.1,1.5,0.2]|    Iris-setosa|
|[5.0,3.6,1.4,0.2]|    Iris-setosa|
|[5.4,3.9,1.7,0.4]|    Iris-setosa|
|[4.6,3.4,1.4,0.3]|    Iris-setosa|
|[5.0,3.4,1.5,0.2]|    Iris-setosa|
|[4.4,2.9,1.4,0.2]|    Iris-setosa|
|[4.9,3.1,1.5,0.1]|    Iris-setosa|
|[5.4,3.7,1.5,0.2]|    Iris-setosa|
|[4.8,3.4,1.6,0.2]|    Iris-setosa|
|[4.8,3.0,1.4,0.1]|    Iris-setosa|
|[4.3,3.0,1.1,0.1]|    Iris-setosa|
|[5.8,4.0,1.2,0.2]|    Iris-setosa|
|[5.7,4.4,1.5,0.4]|    Iris-setosa|
|[5.4,3.9,1.3,0.4]|    Iris-setosa|
|[5.1,3.5,1.4,0.3]|    Iris-setosa|
|[5.7,3.8,1.7,0.3]|    Iris-setosa|
|[5.1,3.8,1.5,0.3]|    Iris-setosa|
|[5.4,3.4,1.7,0.2]|    Iris-setosa|
|[5.1,3.7,1.5,0.4]|    Iris-setosa|
|[4.6,3.6,1.0,0.2]|    Iris-setosa|
|[5.1,3.3,1.7,0.5]|    Iris-setosa|
|[4.8,3.4,1.9,0.2]|    Iris-

In [11]:
"""
进一步处理特征和标签,同时将数据集随机分为训练集和测试集
"""

labelIndexer = StringIndexer() \
    .setInputCol("label") \
    .setOutputCol("indexedLabel") \
    .fit(data)

featureIndexer = VectorIndexer() \
    .setInputCol("features") \
    .setOutputCol("indexedFeatures") \
    .setMaxCategories(4) \
    .fit(data)

labelConverter = IndexToString() \
    .setInputCol("prediction") \
    .setOutputCol("predictedLabel") \
    .setLabels(labelIndexer.labels)

trainingData, testData = data.randomSplit([0.7, 0.3])

In [12]:
"""
创建决策树模型DecisionTreeClassifier， 通过setter的方法来设置决策树的参数
这里仅需要设置特征列（FeaturesCol）和待预测列（LabelCol）。具体可以设置的参数可以通过explainParams()来获取
"""
dtClassifier = DecisionTreeClassifier() \
    .setLabelCol("indexedLabel") \
    .setFeaturesCol("indexedFeatures")

In [15]:
"""
构建机器学习流水线（Pipeline），在训练数据集上调用fit()进行模型训练，
并在测试数据集上调用transform()方法进行预测
"""
# 设置流水线流程
dtPipeline = Pipeline() \
    .setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
# 训练得到一个模型dtPipelineModel
dtPipelineModel = dtPipeline.fit(trainingData)
# 预测
dtPredictions = dtPipelineModel.transform(testData)
# 展示 
dtPredictions.select("predictedLabel", "label", "features").show(20)
evaluator = MulticlassClassificationEvaluator() \
    .setLabelCol("indexedLabel") \
    .setPredictionCol("prediction") 
dtAccuracy = evaluator.evaluate(dtPredictions)
dtAccuracy

+---------------+---------------+-----------------+
| predictedLabel|          label|         features|
+---------------+---------------+-----------------+
|    Iris-setosa|    Iris-setosa|[4.4,3.0,1.3,0.2]|
|    Iris-setosa|    Iris-setosa|[4.6,3.1,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[4.6,3.4,1.4,0.3]|
| Iris-virginica|Iris-versicolor|[4.9,2.4,3.3,1.0]|
|    Iris-setosa|    Iris-setosa|[4.9,3.1,1.5,0.1]|
|    Iris-setosa|    Iris-setosa|[5.0,3.3,1.4,0.2]|
|    Iris-setosa|    Iris-setosa|[5.0,3.4,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[5.0,3.6,1.4,0.2]|
|    Iris-setosa|    Iris-setosa|[5.1,3.4,1.5,0.2]|
|    Iris-setosa|    Iris-setosa|[5.1,3.5,1.4,0.2]|
|    Iris-setosa|    Iris-setosa|[5.1,3.8,1.6,0.2]|
|    Iris-setosa|    Iris-setosa|[5.2,3.4,1.4,0.2]|
|    Iris-setosa|    Iris-setosa|[5.4,3.9,1.7,0.4]|
|Iris-versicolor|Iris-versicolor|[5.6,3.0,4.5,1.5]|
|Iris-versicolor|Iris-versicolor|[5.7,2.8,4.5,1.3]|
|    Iris-setosa|    Iris-setosa|[5.7,3.8,1.7,0.3]|
|Iris-versic

0.9471997300944669

In [11]:
"""
调用DecisionTreeClassificationModel的toDebugString方法，查看训练的决策树模型结构
"""
treeModelClassifier = dtPipelineModel.stages[2]
print("Learned classification tree model:\n" + str(treeModelClassifier.toDebugString))

Learned classification tree model:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_464569bc2a6b) of depth 4 with 13 nodes
  If (feature 2 <= 2.5999999999999996)
   Predict: 1.0
  Else (feature 2 > 2.5999999999999996)
   If (feature 3 <= 1.65)
    If (feature 2 <= 4.95)
     Predict: 0.0
    Else (feature 2 > 4.95)
     If (feature 3 <= 1.55)
      Predict: 2.0
     Else (feature 3 > 1.55)
      Predict: 0.0
   Else (feature 3 > 1.65)
    If (feature 2 <= 4.85)
     If (feature 1 <= 3.05)
      Predict: 2.0
     Else (feature 1 > 3.05)
      Predict: 0.0
    Else (feature 2 > 4.85)
     Predict: 2.0



In [16]:
"""
预测内容：
1. 预测未来一年扶贫费用的开支情况
2. 预测未来一年扶贫人口的药品使用情况
3. 判断药品开支是否合理
"""

'\n预测内容：\n1. 预测未来一年扶贫费用的开支情况\n2. 预测未来一年扶贫人口的药品使用情况\n3. 判断药品开支是否合理\n'