# 作业4：线性模型的分布式算法

### 第1题

先利用如下代码生成模拟数据，并写入文件。数据中最后一列代表因变量 $Y$，其余列为自变量 $X$。

In [1]:
import numpy as np
np.set_printoptions(linewidth=100)

np.random.seed(123)
n = 100000
p = 100
x = np.random.normal(size=(n, p))
beta = np.random.normal(size=p)
y = 1.23 + x.dot(beta) + np.random.normal(scale=2.0, size=n)
dat = np.hstack((x, y.reshape(n, 1)))
np.savetxt("reg_data.txt", dat, fmt="%.8f", delimiter=";")

请以单机模式启动 PySpark，使用4个 CPU 核心，并编写分布式程序，实现如下计算：

In [4]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
# 本地模式
spark = SparkSession.builder.\
    master("local[4]").\
    appName("Regression").\
    getOrCreate()
sc = spark.sparkContext
#sc.setLogLevel("ERROR")
print(spark)
print(sc)

<pyspark.sql.session.SparkSession object at 0x000002167FAAD480>
<SparkContext master=local[4] appName=Regression>


1. 打印数据的前5行，并将每行的字符串截断至80个字符：

In [5]:
file = sc.textFile("reg_data.txt")

text = file.map(lambda x:x[:80]+'...').take(5)
print(*text,sep='\n')

-1.08563060;0.99734545;0.28297850;-1.50629471;-0.57860025;1.65143654;-2.42667924...
0.64205469;-1.97788793;0.71226464;2.59830393;-0.02462598;0.03414213;0.17954948;-...
0.70331012;-0.59810533;2.20070210;0.68829693;-0.00630725;-0.20666230;-0.08652229...
0.76505485;-0.82898883;-0.65915131;0.61112355;-0.14401335;1.31660560;-0.70434215...
1.53409029;-0.52991410;-0.49097228;-1.30916531;-0.00866047;0.97681298;-1.7510703...


2. 将读取数据后得到的 RDD 按分区转为矩阵。使用默认分区数，无需重新分区。打印出转换后的第一个非空分区所包含的数据。

In [6]:
def str_to_vec(line):
    str_vec = line.split(';')
    num_vec = map(lambda x: float(x),str_vec)
    return np.fromiter(num_vec, dtype=float)

def part_to_mat(rdd):
    rdd_arr = map(str_to_vec,rdd)
    data_list = list(rdd_arr)
    if len(data_list) < 1:
        mat = np.array([])
    else:
        mat = np.vstack(data_list)
    yield mat

In [7]:
dat = file.mapPartitions(part_to_mat).filter(lambda x: x.shape[0]>0)
print(dat.first())

[[ -1.0856306    0.99734545   0.2829785  ...   0.37940061  -0.37917643   3.72488966]
 [  0.64205469  -1.97788793   0.71226464 ...  -0.34126172  -0.21794626  10.98088055]
 [  0.70331012  -0.59810533   2.2007021  ...   0.16054442   0.81976061 -12.63028846]
 ...
 [ -0.30751248   0.1323937    2.33256448 ...   0.37475498  -1.37608098 -13.52353737]
 [ -0.02266014  -0.3014796    2.34502536 ...  -2.06082696  -1.20995417 -10.00714174]
 [  0.02415432  -0.3896902   -0.07492828 ...  -0.41935638  -1.68496516   8.33748658]]


3. 估计线性回归模型 $Y=X\beta+\varepsilon$ 的回归系数，**同时包含截距项**。要求**只使用一次** `reduce()`。

回归系数估计值的显式解为 $\hat{\beta}=(X'X)^{-1}X'y$。

In [8]:
data = dat.map(lambda part: np.hstack((np.ones((part.shape[0],1)), part)))
xtx, xty = data.map(lambda part: (part[:, :p+1].T.dot(part[:, :p+1]), part[:, :p+1].T.dot(part[:, p+1]))).\
    reduce(lambda x, y: (x[0]+y[0],x[1]+y[1]))
bhat = np.linalg.solve(xtx, xty)
bhat

array([ 1.22841355, -0.58056172, -1.12947488,  1.16031679,  0.68276231,  0.64063205, -1.69803101,
        0.87295008, -0.6827681 ,  1.21323821, -0.18532546, -0.60313748,  0.45016343,  1.54732259,
        0.93536575,  0.33661885, -0.62839196, -0.18223468,  1.04004336,  0.99530527, -0.22421889,
        0.26910036, -1.95584105,  0.93200566, -0.46663344, -1.30308226, -1.07451859, -0.9200001 ,
       -0.4751849 , -0.41498631,  0.0893936 ,  0.74250157,  0.44142653,  0.78310696,  0.0968675 ,
       -0.20661749,  1.36408459, -0.84452182, -1.56303708, -0.03391736,  0.05672465, -0.01335776,
       -0.31919022, -1.7366497 , -1.35682179, -1.60938262, -1.28888311,  0.92820726,  0.9148462 ,
       -0.87189391, -1.11327839, -0.65324334, -1.54752238, -1.48016168, -1.40044728,  0.06124555,
       -2.06832355,  0.23966887, -1.45310857, -0.4958114 , -1.0917562 ,  1.22608413,  0.71866161,
        0.46548143, -0.21573557,  1.19919219, -0.18470024,  0.41716831,  0.48748654, -0.28702665,
       -0.92945413, 

4. 设计一个分布式算法，计算回归模型的 $R^2$。

In [9]:
sse, sst = data.map(lambda part: (
    np.sum((part[:, p+1]-part[:, :p+1].dot(bhat))**2), 
    np.sum((part[:, p+1]-np.mean(part[:, p+1]))**2))).\
    reduce(lambda x, y: (x[0]+y[0],x[1]+y[1]))
R2 = 1 - sse/sst
R2

0.9654393907479705

### 第2题

(a) 考虑 Softplus 函数 $$\mathrm{softplus}(x)=\log(1+e^x)$$

请利用 Numpy 编写一个函数 `softplus(x)`，令其可以接收一个向量或矩阵 `x`，返回 Softplus 函数在 `x` 上的取值。

In [10]:
import numpy as np

def softplus(x):
    return np.log(1.0 + np.exp(x))
    # 此处插入代码

一个简单的测试：

In [14]:
x = np.array([-1000.0, -100.0, -10.0, 0.0, 1.0, 10.0, 100.0, 1000.0])

# 上面编写的函数
print(softplus(x))

[0.00000000e+00 0.00000000e+00 4.53988992e-05 6.93147181e-01 1.31326169e+00 1.00000454e+01
 1.00000000e+02            inf]


  return np.log(1.0 + np.exp(x))


(b) 上述结果是否正常？如果出现异常取值，思考可能的原因是什么，并参照课件上的说明再次尝试编写 Softplus 函数。注意尽可能使用 Numpy 提供的向量化函数，避免使用循环。该函数需同时支持向量和矩阵参数。如果一切正常，可忽略此问题。

出现了inf无穷大，因为x很大时，exp（x）溢出

In [15]:
def softplus(x):
    # 此处插入代码
    sx = np.where(x < 0, np.log(1.0 + np.exp(x)), x + np.log(1.0 + np.exp(-x)))
    return sx

再次测试：

In [16]:
print(softplus(x))
print()
print(softplus(x.reshape(4, 2)))

[0.00000000e+00 0.00000000e+00 4.53988992e-05 6.93147181e-01 1.31326169e+00 1.00000454e+01
 1.00000000e+02 1.00000000e+03]

[[0.00000000e+00 0.00000000e+00]
 [4.53988992e-05 6.93147181e-01]
 [1.31326169e+00 1.00000454e+01]
 [1.00000000e+02 1.00000000e+03]]


  sx = np.where(x < 0, np.log(1.0 + np.exp(x)), x + np.log(1.0 + np.exp(-x)))


### 第3题

利用如下代码生成模拟数据，其中数据第一列代表0-1因变量 $Y$，其余列为自变量 $X$。

In [17]:
import numpy as np
from scipy.special import expit

np.random.seed(123)
n = 100000
p = 100
x = np.random.normal(size=(n, p))
beta = np.random.normal(size=p)
prob = expit(-0.5 + x.dot(beta))  # p = 1 / (1 + exp(-x * beta))
y = np.random.binomial(1, prob, size=n)
dat = np.hstack((y.reshape(n, 1), x))
np.savetxt("logistic_data.txt", dat, fmt="%.8f", delimiter="\t")

1. 对上述数据建立 Logistic 回归模型。任选一种算法，估计 Logistic 回归的回归系数，**同时包含截距项**。请利用第2题中编写的 Softplus 函数，编写**数值稳定**的函数计算 Logistic 回归的目标函数和梯度。

In [54]:
import time
file = sc.textFile("logistic_data.txt")

def str_to_vec(line):
    str_vec = line.split('\t')
    num_vec = map(lambda x: float(x),str_vec)
    return np.fromiter(num_vec, dtype=float)

def part_to_mat(rdd):
    rdd_arr = map(str_to_vec,rdd)
    data_list = list(rdd_arr)
    if len(data_list) < 1:
        mat = np.array([])
    else:
        mat = np.vstack(data_list)
    yield mat

dat = file.mapPartitions(part_to_mat).filter(lambda x: x.shape[0]>0)
data = dat.map(lambda part: np.hstack((part, np.ones((part.shape[0],1)))))
data.cache()

PythonRDD[119] at RDD at PythonRDD.scala:53

In [55]:
def compute_stats(part_mat, beta_old):
    # 提取 X 和 y
    y = part_mat[:, 0]
    x = part_mat[:, 1:]
    # X * beta
    xb = x.dot(beta_old)
    # rho(X * beta)
    prob = expit(xb)
    # W 的对角线元素
    w = prob * (1.0 - prob) + 1e-6
    # X'W，数组广播操作，避免生成完整的 W
    xtw = x.transpose() * w
    # X'WX
    xtwx = xtw.dot(x)
    # X'Wz
    z = xb + (y - prob) / w
    xtwz = xtw.dot(z)
    # 目标函数：sum(y * log(prob) + (1 - y) * log(1 - prob))
    # softplus(x) = log(1+exp(x))
    objfn = -np.sum(y * xb - softplus(xb))

    return xtwx, xtwz, objfn

# 根据数据动态获取维度，不要使用之前模拟时的变量
p = data.first().shape[1] - 1
# beta 初始化为 0 向量
beta_hat = np.zeros(p)
# 记录目标函数值
objvals = []

# 最大迭代次数
maxit = 20
# 收敛条件
eps = 1e-6

t1 = time.time()
for i in range(maxit):
    # 完整数据的 X'WX 和 X'Wz 是各分区的加和
    xtwx, xtwz, objfn = data.map(lambda part: compute_stats(part, beta_hat)).\
        reduce(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))
    # 计算新 beta
    beta_new = np.linalg.solve(xtwx, xtwz)
    # 计算 beta 的变化
    resid = np.linalg.norm(beta_new - beta_hat)
    print(f"Iteration {i}, objfn = {objfn}, resid = {resid}")
    objvals.append(objfn)
    # 如果 beta 几乎不再变化，退出循环
    if resid < eps:
        break
    # 更新 beta
    beta_hat = beta_new
t2 = time.time()
print(f"\nfinished in {t2 - t1} seconds")

Iteration 0, objfn = 69314.71805599453, resid = 1.5698521885255374
Iteration 1, objfn = 32646.97922791124, resid = 1.390199746226831
Iteration 2, objfn = 21647.76931828451, resid = 1.736878881587562
Iteration 3, objfn = 16036.817158222435, resid = 2.077661047626656
Iteration 4, objfn = 13369.971015111048, resid = 2.055442578945903
Iteration 5, objfn = 12424.605050958291, resid = 1.3106763115298783
Iteration 6, objfn = 12255.539828228106, resid = 0.34684153390742634
Iteration 7, objfn = 12248.214989925482, resid = 0.018344346618315865
Iteration 8, objfn = 12248.196924487687, resid = 6.370796265214652e-05
Iteration 9, objfn = 12248.19692427128, resid = 6.056471730067524e-08

finished in 29.549819231033325 seconds


In [64]:
print(beta_hat)  #包含截距项
print()
print(*objvals,sep='\n')  #目标函数
print()
print(resid)  #梯度

[-0.59176196 -1.10420803  1.1546564   0.67280846  0.63931464 -1.68220071  0.86041452 -0.69834251
  1.22446393 -0.21062583 -0.60143279  0.44213217  1.57506739  0.93504494  0.34283382 -0.63115954
 -0.16737269  1.03564847  0.9885046  -0.21736314  0.26608044 -1.9546613   0.93399147 -0.44097986
 -1.32382408 -1.06955406 -0.93365571 -0.47879284 -0.40975603  0.13045673  0.72407648  0.43211279
  0.78064486  0.12355277 -0.20116152  1.34425232 -0.8467126  -1.57109621 -0.02174217  0.04202859
  0.01757209 -0.33735364 -1.74371023 -1.32742343 -1.60007017 -1.28377679  0.93921256  0.93254572
 -0.84857908 -1.08700601 -0.65543369 -1.52634259 -1.46037052 -1.41541159  0.06736292 -2.06484347
  0.25380448 -1.44377969 -0.45925857 -1.12439824  1.24274755  0.72115264  0.46168381 -0.20588244
  1.19789065 -0.17370143  0.42621562  0.49622293 -0.29831759 -0.93076696 -2.52159476  1.21260763
 -0.40380623  0.41771335  0.75208194  1.5969521  -0.36537673  0.40531527 -1.43161884 -0.46412512
 -0.29281007 -1.65346949  1.14

2. 利用估计得到的 $\hat{\beta}$ 对原始数据进行预测，令 $\hat{\rho}_i$ 表示估计出的每个观测 $Y_i$ 取值为1的概率。为每个观测计算一个预测的0-1标签 $\hat{l}_i$，规则如下：如果 $\hat{\rho}_i\ge 0.5$，则 $\hat{l}_i=1$，反之 $\hat{l}_i=0$。利用分布式算法计算模型的预测准确度，即 $n^{-1}\sum_{i=1}^n I(Y_i=\hat{l}_i)$。$I(Y_i=\hat{l}_i)$ 表示预测对取1，预测错取0。

In [58]:
def pred(part_mat, bhat):
    x = part_mat[:, 1:]
    # X * beta
    xb = x.dot(bhat)
    # rho(X * beta)
    prob = expit(xb)
    li = np.where(prob>=0.5, 1, 0)
    return li

acc = data.map(lambda part: np.sum(np.where(pred(part, beta_hat)==part[:, 0], 1, 0))).reduce(lambda x,y: x + y)
ACC = acc/n
print(ACC)

0.94737


In [59]:
sc.stop()