# Spark 与机器学习


李丰

feng.li@cufe.edu.cn

中央财经大学

#### Spark 机器学习辅助工具

- 模型选择工具
- 交叉验证模块
- 训练集测试集拆分
- [模型评估工具](https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html)：Precision，Recall， ROC，F-measure	



#### Spark MLlib 内置的机器学习模型

- 文本特征提取： TF-IDF，Word2Vec, n-gram
- 分类模型：朴素贝叶斯，逻辑回归，随机森林，决策树
- 聚类：K-Means，LDA，Mixtures
- 降维/变量选择：PCA，SVD, LASSO
- 模式识别
- 流数据处理



#### [Spark 优化工具](https://spark.apache.org/docs/latest/mllib-optimization.html)

- 梯度下降、随机梯度下降
- BFGS、LBFGS

#### 准备工作

我们首先加载Python的pypark模块并且注册一个Spark App

In [62]:
import findspark
findspark.init("/usr/lib/spark-current")
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("Spark Machine Learning App").getOrCreate()

### K-Means

In [66]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Load data.
dataset = spark.read.format("libsvm").load("kmeans_data.txt")

# Train a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

In [65]:
# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Silhouette with squared euclidean distance = 0.9997530305375207
Cluster Centers: 
[ 0.1  0.1  0.1]
[ 9.1  9.1  9.1]


### Spark 中实现逻辑回归

#### 加载训练数据

In [18]:
from pyspark.ml.classification import LogisticRegression
training = spark.read.format("libsvm").load("libsvm_data.txt")

In [47]:
training.head(3)

[Row(label=0.0, features=SparseVector(692, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0

#### 模型设定

In [33]:
lr = LogisticRegression(maxIter=10, 
                        regParam=0.3,  # regularization parameter
                        elasticNetParam=0.8) # ElasticNet mixing parameter 

#### 训练模型

In [21]:
lrModel = lr.fit(training)

#### 模型参数

In [32]:
lrModel.intercept

0.22456315961250325

In [31]:
lrModel.coefficients

SparseVector(692, {244: -0.0001, 263: -0.0001, 272: -0.0002, 300: -0.0002, 301: -0.0, 328: -0.0001, 350: 0.0, 351: 0.0, 378: 0.0004, 379: 0.0001, 405: 0.0001, 406: 0.0006, 407: 0.0003, 428: -0.0001, 433: 0.0004, 434: 0.0006, 455: -0.0001, 456: -0.0002, 461: 0.0003, 462: 0.0006, 483: -0.0002, 484: -0.0001, 489: 0.0003, 490: 0.0003, 496: -0.0001, 511: -0.0004, 512: -0.0003, 517: 0.0003, 539: -0.0002, 540: -0.0015, 568: -0.0002})

#### 模型迭代细节

In [46]:
objectiveHistory = lrModel.summary.objectiveHistory

iIter = 0
print("objectiveHistory:")
for objective in objectiveHistory:
    iIter += 1
    print("Iter-"+ str(iIter) + ": " + str(objective))

objectiveHistory:
Iter-1: 0.6833149135741672
Iter-2: 0.6662875751473734
Iter-3: 0.6217068546034618
Iter-4: 0.6127265245887887
Iter-5: 0.6060347986802873
Iter-6: 0.6031750687571562
Iter-7: 0.5969621534836274
Iter-8: 0.5940743031983118
Iter-9: 0.5906089243339022
Iter-10: 0.5894724576491042
Iter-11: 0.5882187775729587


### Spark 中流数据的线性回归

- 数据是以数据流的形式进入模型
- 在线模型会随着数据的增加而调整模型参数（本质上为贝叶斯更新) 
$$p(\theta|y_{1:[T+1]}) \propto p(y_{T+1}| \theta)p(\theta | y_{1:T})$$

#### 流数据的回归模型实现

- `mllib` 提供了分布式的随机梯度下降算法
- example: `streaming_linear_regression.py`

- 运行方式: `python3 streaming_linear_regression.py <trainingDir> <testDir>`



#### 一个在线模型的Spark三部曲

1. 通过`textFileStream()`读取Streaming 数据textFileStream
2. 随着数据的更新不断训练线性回归模型
3. 实时预测


```python
import sys
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD

ssc = StreamingContext(sc, 1)

def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(',')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
    return LabeledPoint(label, vec)

trainingData = ssc.textFileStream(sys.argv[1]).map(parse).cache()
testData = ssc.textFileStream(sys.argv[2]).map(parse)

numFeatures = 3
model = StreamingLinearRegressionWithSGD()
model.setInitialWeights([0.0, 0.0, 0.0])

model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))

ssc.start()
ssc.awaitTermination()
```

### Spark 中的矩阵运算

#### SVD 分解

In [51]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
sc = pyspark.SparkContext.getOrCreate()

rows = sc.parallelize([
    Vectors.sparse(5, {1: 1.0, 3: 7.0}),
    Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
    Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])

mat = RowMatrix(rows)

svd = mat.computeSVD(5, computeU=True)
U = svd.U       # The U factor is a RowMatrix.
s = svd.s       # The singular values are stored in a local dense vector.
V = svd.V       # The V factor is a local dense matrix.

<pyspark.mllib.linalg.distributed.RowMatrix object at 0x7f85103bb400>


#### PCA 

In [58]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix

rows = sc.parallelize([
    Vectors.sparse(5, {1: 1.0, 3: 7.0}),
    Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
    Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])

mat = RowMatrix(rows)
# Compute the top 4 principal components.
# Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(4)

# Project the rows to the linear space spanned by the top 4 principal components.
projected = mat.multiply(pc)

## 上机实践

- 创建并运行一个简单的Spark机器学习模型。

- 了解Spark的[机器学习算法](https://spark.apache.org/docs/latest/mllib-guide.html)。

- 课外阅读材料

    - https://spark.apache.org/docs/latest/streaming-programming-guide.html
    - https://spark.apache.org/docs/latest/api/python/index.html