设有一组数字，这组数字的均值和方差如下：

$$
mean=\frac{x_{1}+x_{2}+\ldots+x_{n}}{n}=\frac{\operatorname{sum} 1}{n}
$$

$$
\operatorname{var}=\frac{\sum_{i=1}^{n}\left(x_{i}-mean\right)^{2}}{n}=
\frac{\sum_{i=1}^{n}\left(x_{i}^{2}-2 * m e a n * x_{i}+m e a n^{2}\right)}{n}=\frac{s u m 2}{n}-m e a n^{2}
$$

其中，$sum1$ 是 $\left(x_{1}+x_{2}+\ldots+x_{n}\right)$，$sum2$ 就是 $\left(x_{1}^{2}+x_{2}^{2}+\ldots+x_{n}^{2}\right)$

首先，计算 map 端每个部分的 {count(元素个数)、sum1/count、sum2/count}，然后在 reduce 端将所有 map 端传入的 sum1 加起来在除以总个数 n 得到均值 mean；将所有的 sum2 加起来除以 n 再减去均值 mean 的平方，就得到了方差 var.

In [1]:
import numpy as np
import time

def mapper(input):
    numInputs = len(input)
    input = np.mat(input)
    sqInput = np.power(input,2)

    # 输出数据个数，均值，以及平方和的均值
    return [numInputs, np.mean(input), np.mean(sqInput)]


def reduce(mapperOut):
    cumVal = 0.0
    cumSumSq = 0.0
    cumN = 0.0
    for instance in mapperOut:
        nj = float(instance[0]) # 第一个字段是数据个数
        cumN += nj
        cumVal += nj*float(instance[1])    # 第二个字段是一个 map 输出的均值，均值乘以数据个数就是数据总和
        cumSumSq += nj*float(instance[2])  # 第三个字段是一个 map 输出的平方和的均值，乘以元素个数就是所有元素的平方和

    mean = cumVal/cumN  # 得到所有元素的均值
    var = (cumSumSq/cumN-mean*mean) # 得到所有元素的方差

    print(cumN, mean, var)

In [2]:
data = np.array(np.random.randint(0, 100, 10**6))
n = 10
mapData = data.reshape(n, -1)
mapperOut = np.zeros([n, 3])

In [3]:
start = time.perf_counter()
for i in range(n):
    mapperOut[i] = mapper(mapData[i])
node = time.perf_counter()
reduce(mapperOut)
end = time.perf_counter()
print(end - node + (node-start)/n)

1000000.0 49.526416 834.9573621949444
0.0008411000999998918


In [4]:
start = time.perf_counter()
print(len(data), data.mean(), data.var())
end = time.perf_counter()
print(end-start)

1000000 49.526416 834.957362194944
0.01815889699999995
