In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
import os
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3.5"

spark = SparkSession.builder \
    .master("yarn") \
    .appName("adult_logReg") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext
'''
#在定义字段名的同时也定义了数据类型，所以对于混合数据类型，不能采用批量定义的方法
row_name = "age, workclass, fnlwgt, education, education-num, marital-status, occupation, relationship, race, sex, capital-gain, capital-loss, hours-per-week, native-country, income"
#根据模式字符串生成模式
fields = list(map(lambda fieldName : StructField(fieldName, StringType(), nullable = True), row_name.split(", ")))
schema = StructType(fields)
df = spark.read.csv('hdfs://master:9000/dataset/adult.csv', schema=schema, inferSchema=True, header=True)
'''
df = spark.read.csv('hdfs://master:9000/dataset/adult.csv', inferSchema=True, header=True)
df.show(3)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-----+------------+------------+--------------+--------------+------+
|age|        workclass|fnlwgt| education|education-num|     marital-status|        occupation|  relationship|  race|  sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-----+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516| Bachelors|           13|      Never-married|      Adm-clerical| Not-in-family| White| Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311| Bachelors|           13| Married-civ-spouse|   Exec-managerial|       Husband| White| Male|           0|           0|            13| United-States| <=50K|
| 38|          Private|215646|   HS-grad|            9|           Divo

In [3]:
cols = df.columns
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



### StringIndexer自动判断那些特征是离散特征，并对他们进行编号，根据label出现的频率排序，最频繁出现的label的index为0
### StringIndexer本质上是将String类型转换成index(number)；如果是数值(numeric)转换成index(number)，实际上是先对数值进行类型转换，然后对他们进行编号

### 对于一些特征工程方面，有时会用到StringIndexer和OneHotEncoder。
### 比如kaggle中对于性别，sex，一般的属性值是male和female。两个值。那么不靠谱的方法直接用0表示male，用1表示female 了。
### 首先我们需要用StringIndexer(scikit learn中LabelEncoder)把sex这个属性列里面的离散属性用数字来表示，就是上面的过程，把male,female这种不同的字符的属性值，用数字表示。
### One-Hot编码，又称为一位有效编码，主要是采用位状态寄存器来对个状态进行编码，每个状态都由他独立的寄存器位，并且在任意时候只有一位有效。

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["workclass", "education", "marital-status", "occupation", "relationship", \
                      "race", "sex", "native-country"]
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCol = stringIndexer.getOutputCol(), outputCol = categoricalCol + "classVec")
    stages += [stringIndexer, encoder]
    
label_stringIdx = StringIndexer(inputCol = "income", outputCol = 'label')
stages += [label_stringIdx]

### 从源数据中提取特征指标数据，这是一个比较典型且通用的步骤，因为我们的原始数据集里，经常会包含一些非指标数据，如 ID，Description 等。为方便后续模型进行特征输入，需要部分列的数据转换为特征向量，并统一命名，VectorAssembler类完成这一任务。VectorAssembler是一个transformer，将多列数据转化为单列的向量列。

In [4]:
numericCols = ["age", "fnlwgt", "education-num", "capital-gain", "capital-loss", "hours-per-week"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "features")
stages += [assembler]

In [5]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedcols = ["label", "features"] + cols
df = df.select(selectedcols)
df.show(3)

+-----+--------------------+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-----+------------+------------+--------------+--------------+------+
|label|            features|age|        workclass|fnlwgt| education|education-num|     marital-status|        occupation|  relationship|  race|  sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+-----+--------------------+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-----+------------+------------+--------------+--------------+------+
|  0.0|(100,[4,10,24,32,...| 39|        State-gov| 77516| Bachelors|           13|      Never-married|      Adm-clerical| Not-in-family| White| Male|        2174|           0|            40| United-States| <=50K|
|  0.0|(100,[1,10,23,31,...| 50| Self-emp-not-inc| 83311| Bachelors|           13| Married-civ-spouse|   Exec-managerial|       Husband| White| Male

In [6]:
display(df)

DataFrame[label: double, features: vector, age: int, workclass: string, fnlwgt: int, education: string, education-num: int, marital-status: string, occupation: string, relationship: string, race: string, sex: string, capital-gain: int, capital-loss: int, hours-per-week: int, native-country: string, income: string]

In [7]:
df_tmp = df.select(["label", "features"])
df_tmp.show(3)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(100,[4,10,24,32,...|
|  0.0|(100,[1,10,23,31,...|
|  0.0|(100,[0,8,25,38,4...|
+-----+--------------------+
only showing top 3 rows



In [8]:
train, test = df.randomSplit([0.7, 0.3], seed=100)
print(train.count())
print(test.count())

22838
9723


## Logistic Regression

### 管道里的主要概念
MLlib提供标准的接口来使联合多个算法到单个的管道或者工作流，管道的概念源于scikit-learn项目。  
1. **数据框**：机器学习接口使用来自Spark SQL的数据框形式数据作为数据集，它可以处理多种数据类型。比如，一个数据框可以有不同的列存储文本、特征向量、标签值和预测值。  
2. **转换器**：转换器是将一个数据框变为另一个数据框的算法。比如，一个机器学习模型就是一个转换器，它将带有特征数据框转为预测值数据框。  
3. **估计器**：估计器是拟合一个数据框来产生转换器的算法。比如，一个机器学习算法就是一个估计器，它训练一个数据框产生一个模型。技术上说，估计器通过fit()方法，接受一个数据框产生一个模型。比如，逻辑回归就是一个估计器，通过fit()来产生一个逻辑回归模型。  
4. **管道**：一个管道串起多个转换器和估计器，明确一个机器学习工作流。  
5. **参数**：管道中的所有转换器和估计器使用共同的接口来指定参数。  

转换器的transform()方法和估计器的fit()方法都是无状态性的。将来，有状态性的算法可能通过其他概念得到支持。  
每个转换器或估计器实例有唯一的编号，这个特征在制定参数的时候非常有用。

In [9]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol = 'label', featuresCol = 'features', maxIter = 10)
lrModel = lr.fit(train)
predictions = lrModel.transform(test)
predictions.take(1)

[Row(label=0.0, features=SparseVector(100, {0: 1.0, 8: 1.0, 23: 1.0, 29: 1.0, 43: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 94: 26.0, 95: 58426.0, 96: 9.0, 99: 50.0}), age=26, workclass=' Private', fnlwgt=58426, education=' HS-grad', education-num=9, marital-status=' Married-civ-spouse', occupation=' Prof-specialty', relationship=' Husband', race=' White', sex=' Male', capital-gain=0, capital-loss=0, hours-per-week=50, native-country=' United-States', income=' <=50K', rawPrediction=DenseVector([0.8111, -0.8111]), probability=DenseVector([0.6923, 0.3077]), prediction=0.0)]

In [10]:
predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [12]:
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)
selected.show(4)

DataFrame[label: double, prediction: double, probability: vector, age: int, occupation: string]

+-----+----------+--------------------+---+---------------+
|label|prediction|         probability|age|     occupation|
+-----+----------+--------------------+---+---------------+
|  0.0|       0.0|[0.69234532795194...| 26| Prof-specialty|
|  0.0|       0.0|[0.62115531452964...| 30| Prof-specialty|
|  0.0|       0.0|[0.65845294177529...| 31| Prof-specialty|
|  0.0|       0.0|[0.65826620022842...| 32| Prof-specialty|
+-----+----------+--------------------+---+---------------+
only showing top 4 rows



In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Area Under ROC", evaluator.evaluate(predictions))
print(evaluator.getMetricName())
print(lr.explainParams())

Area Under ROC 0.9014206228690932
areaUnderROC
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
maxIter: max number of iterations (>= 0). (default: 100, current: 10)
predictionCol: prediction column name. (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities.

### ParamGridBuilder用来构建参数网络
参数的解释见上文，regParam表示正则参数，elasticNetParam表示正则类型
### 交叉验证
CrossValidator将数据集划分为若干子集分别地进行训练和测试。如当k＝3时，CrossValidator产生3个训练数据与测试数据对，每个数据对使用2/3的数据来训练，1/3的数据来测试。对于一组特定的参数表，CrossValidator计算基于三组不同训练数据与测试数据对训练得到的模型的评估准则的平均值。确定最佳参数表后，CrossValidator最后使用最佳参数表基于全部数据来重新拟

In [15]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModels = cv.fit(train)
predictions = cvModels.transform(test)
print('Area Under ROC', evaluator.evaluate(predictions))

Area Under ROC 0.8991662607798677


## Decision Trees

In [20]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=3)
dtModel = dt.fit(train)

print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)
dt_predictions = dtModel.transform(test)
dt_predictions.printSchema()

numNodes =  15
depth =  3
root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [21]:
selected = dt_predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)
selected.show(4)

DataFrame[label: double, prediction: double, probability: vector, age: int, occupation: string]

+-----+----------+--------------------+---+---------------+
|label|prediction|         probability|age|     occupation|
+-----+----------+--------------------+---+---------------+
|  0.0|       0.0|[0.68829891838741...| 26| Prof-specialty|
|  0.0|       0.0|[0.68829891838741...| 30| Prof-specialty|
|  0.0|       0.0|[0.68829891838741...| 31| Prof-specialty|
|  0.0|       0.0|[0.68829891838741...| 32| Prof-specialty|
+-----+----------+--------------------+---+---------------+
only showing top 4 rows



In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
print("Area Under ROC", evaluator.evaluate(dt_predictions))
print(evaluator.getMetricName())
print(dt.explainParams())

Area Under ROC 0.6105272039652752
areaUnderROC
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. (default: 10)
featuresCol: features column name. (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default: label, current: label)
maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature. (default: 32)
maxDepth: Maximum depth of the tree. (>= 0) E.g.,

In [26]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModels = cv.fit(train)
print("numNodes = ", cvModels.bestModel.numNodes)
print("depth = ", cvModels.bestModel.depth)
predictions = cvModels.transform(test)
print('Area Under ROC', evaluator.evaluate(predictions))

numNodes =  527
depth =  10
Area Under ROC 0.7787294788413602


### Random Forest