# 原理解析

## 矩阵乘法的 MapReduce

要实现矩阵乘法的 MapReduce，只要能够理解分块矩阵相乘就可以了，已知矩阵

$$
A \in R^{M \times N}, B \in R^{N \times H}
$$

其中，

$$
A=\left[\begin{array}{cccc}{a_{11}} & {a_{12}} & {a_{13}} & {\cdots} & {a_{1 N}} \\ {a_{21}} & {a_{22}} & {a_{23}} & {\cdots} & {a_{2 N}} \\ {a_{31}} & {a_{32}} & {a_{33}} & {\cdots} & {a_{3 N}} \\ {\vdots} & {\vdots} & {\vdots} & {\vdots} & {\vdots} \\ {a_{M 1}} & {a_{M 2}} & {a_{M 3}} & {\cdots} & {a_{M N}}\end{array}\right] \quad B=\left[\begin{array}{ccccc}{b_{11}} & {b_{12}} & {b_{13}} & {\cdots} & {b_{1 H}} \\ {b_{21}} & {b_{22}} & {b_{23}} & {\cdots} & {b_{2 H}} \\ {b_{31}} & {b_{32}} & {b_{33}} & {\cdots} & {b_{3 H}} \\ {\vdots} & {\vdots} & {\vdots} & {\vdots} & {\vdots} \\ {b_{N 1}} & {b_{N 2}} & {b_{N 3}} & {\cdots} & {b_{N H}}\end{array}\right]
$$

矩阵 $C=A B$, 那么 $C \in R^{M \times H}$

下面我们矩阵分块，将 A 矩阵按列分为 V 块，B 矩阵按行分为 V 块，即：

$$
\begin{array}{l}{A=\left[A_{M \times N_{1}} A_{M \times N_{2}} \quad A_{M \times N_{3}} \cdots A_{M \times N_{V}}\right]} \\ {B=\left[\begin{array}{c}{B_{N_{1} \times H}} \\ {B_{N_{3} \times H}} \\ {B_{N_{3} \times H}} \\ {\vdots} \\ {B_{N_{V} \times H}}\end{array}\right]}\end{array}
$$

其中

$$
N=\sum_{i=1}^{V} N_{i}
$$

那么显然：

$$
C=A B=\sum_{i=1}^{V} A_{M \times N_{i}} B_{N_{i} \times H}
$$

这时候，就可以准备 V 台计算机，每一台计算机计算一个 $A_{M \times N_{i}} B_{N_{i} \times H}$，这就是 Map 的过程，然后，再把这台计算机上的结果加和，就是 Reduce 的过程。

## 线性最小二乘的 MapReduce

在解线性最小二乘问题时，我们已知的准备工作是数据集（训练集），包含 M 个点，即：

$$
\left\{\left(\mathbf{x}^{(1)}, y^{(1)}\right),\left(\mathbf{x}^{(2)}, y^{(2)}\right), \cdots,\left(\mathbf{x}^{(M)}, y^{(M)}\right)\right\}
$$

其中：$\mathbf{x}^{(i)} \in R^{N}, y^{(i)} \in R^{1}, \forall i \in\{1,2, \cdots, M\}$。

而我们最后要的结果是找出 $\mathbf{W}$ 满足下式：

$$
A^{T} A \mathbf{w}=A^{T} \mathbf{b}
$$

其中：

$$
A=\left[\begin{array}{cccc}{1} & {x_{1}^{(1)}} & {x_{2}^{(1)}} & {\cdots} & {x_{N}^{(1)}} \\ {1} & {x_{1}^{(2)}} & {x_{2}^{(2)}} & {\cdots} & {x_{N}^{(2)}} \\ {1} & {x_{1}^{(3)}} & {x_{2}^{(3)}} & {\cdots} & {x_{N}^{(3)}} \\ {\vdots} & {\vdots} & {\vdots} & {\vdots} & {\vdots} \\ {1} & {x_{1}^{(M)}} & {x_{2}^{(M)}} & {\cdots} & {x_{N}^{(M)}}\end{array}\right], \quad\left[\begin{array}{c}{w_{0}} \\ {w_{1}} \\ {w_{2}} \\ {\vdots} \\ {w_{N}}\end{array}\right] \mathbf{b}=\left[\begin{array}{c}{y^{(1)}} \\ {y^{(2)}} \\ {y^{(3)}} \\ {\vdots} \\ {y^{(M)}}\end{array}\right]
$$

求 $C=A^{T} A$ 和 $D=A^{T} b$ 时，就会用到 MapReduce。

因为 C 和 D 都是矩阵相乘的结果，那么根据上节讨论，可知道通过矩阵的分块来实现 Map 和 Reduce 两个过程。

$$
C=\sum_{i=1}^{M} \mathbf{a}^{(i)}\left(\mathbf{a}^{(i)}\right)^{T} \quad D=\sum_{i=1}^{M} \mathbf{a}^{(i)} y^{(i)}
$$

其中：

$$
\mathbf{a}^{(i)}=\left[\begin{array}{c}{1} \\ {\mathbf{x}_{1}^{(i)}} \\ {\mathbf{x}_{2}^{(i)}} \\ {\mathbf{x}_{3}^{(i)}} \\ {\vdots} \\ {\mathbf{x}_{N}^{(i)}}\end{array}\right]
$$

把这 M 个 $\mathbf{a}^{(i)}$ 分成 V 份，分别在 V 台计算机上运行，那么这个过程就是 Map；然后把 V 个结果相加，就得到了矩阵 C 和 D，这个过程就是 Reduce。

# 实证分析

## 波士顿房价数据集

In [168]:
import numpy as np
import pandas as pd
from sklearn import linear_model
from sklearn.datasets import load_boston

boston = load_boston()
X = pd.DataFrame(boston.data)
X.columns = boston.feature_names
y = pd.DataFrame(boston.target)
X = X[0:500]; y = y[0:500]
X.shape

(500, 13)

In [169]:
X.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03
3,0.03237,0.0,2.18,0.0,0.458,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94
4,0.06905,0.0,2.18,0.0,0.458,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33


## 函数结果

In [170]:
model = linear_model.LinearRegression()
model.fit(X, y)

LinearRegression(copy_X=True, fit_intercept=True, n_jobs=None,
         normalize=False)

In [171]:
model.coef_

array([[-1.05855998e-01,  4.87517296e-02,  1.82934523e-02,
         2.61119923e+00, -1.70062416e+01,  3.79736168e+00,
         2.02008117e-03, -1.51449492e+00,  2.90883354e-01,
        -1.25645889e-02, -8.85637329e-01,  9.29626996e-03,
        -5.38781881e-01]])

In [172]:
model.intercept_

array([35.40683395])

## MapReduce 结果

In [174]:
data = X
data['intercept'] = 1
data['y'] = y
data.head()

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,intercept,y
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98,1,24.0
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14,1,21.6
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03,1,34.7
3,0.03237,0.0,2.18,0.0,0.458,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94,1,33.4
4,0.06905,0.0,2.18,0.0,0.458,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33,1,36.2


In [176]:
data.to_csv('boston.csv', header=False, index=False)

在本地 shell 测试所写的 mapper 与 reduce 函数：

```bash
cat boston.csv | python3 lr_ mapper.py | python3 lr_reduce.py
```

结果如下，与上述结果相同，测试成功。

![结果](https://blog-1255524710.cos.ap-beijing.myqcloud.com/temp/Snipaste_2019-07-03_21-09-37.png)