# 作业4：线性模型的分布式算法

### 第1题

1. 打印数据文件 `sim_linear.txt` 的前5行，并适当将每行字符串截尾。

In [2]:
with open("sim_linear.txt", encoding="utf-8") as file:
    for i in range(5):
        line = next(file)
        print(line[:70] + "...")

Y1	Y2	V1	V2	V3	V4	V5	V6	V7	V8	V9	V10	V11	V12	V13	V14	V15	V16	V17	V18	V...
6.833723,0.000000,-1.085631,0.997345,0.282978,-1.506295,-0.578600,1.65...
-7.759047,1.000000,-0.255619,-2.798589,-1.771533,-0.699877,0.927462,-0...
-4.759579,0.000000,-0.772709,0.794863,0.314272,-1.326265,1.417299,0.80...
15.867100,0.000000,1.150206,-1.267352,0.181035,1.177862,-0.335011,1.03...


2. 在数据矩阵中，名为 $Y_1$ 和 $Y_2$ 的两个变量为因变量，其余的为自变量。请计算数据的样本量 $n$ 和自变量数目 $p$。

In [3]:
import numpy as np

n = 0
p = 0
with open("sim_linear.txt", encoding="utf-8") as file:
    header = next(file).strip().split("\t")
    for element in header:
        if element != "Y1" and element != "Y2":
            p += 1
    for _ in file:
        n += 1
print(f"n = {n}, p = {p}")

n = 2333333, p = 30


3. 利用分布式计算的方法，按每10000个观测为一个批次读取数据，估计线性回归模型 $Y_1=\beta_0+X\beta+\varepsilon$ 的回归系数，**同时包含截距项**。要求**只编写一个** `@ray.remote` 函数并且**只使用一个** `reduce()` 来完成计算。

In [4]:
import ray
ray.init()

2024-05-31 12:24:57,788	INFO worker.py:1715 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.13
Ray version:,2.9.3
Dashboard:,http://127.0.0.1:8265


In [5]:
import more_itertools as mit

def txt_to_mat(lines):
    return np.loadtxt(lines, delimiter=",")

def mat_to_objref(mat):
    return ray.put(mat)

batch_size = 10000

with open("sim_linear.txt", encoding="utf-8") as file:
    next(file)
    it_chunk = mit.chunked(file, batch_size)
    it_mat = map(txt_to_mat, it_chunk)
    it_obj = map(mat_to_objref, it_mat)
    batches = list(it_obj)

In [6]:
import functools

@ray.remote
def batch_multiply(batch) -> tuple:
    X = batch[:, 2:]
    X_plus = np.concatenate((np.ones((X.shape[0], 1)), X), axis=1)
    Y1 = batch[:, 0]
    XtX = X_plus.T @ X_plus
    XtY1 = X_plus.T @ Y1
    return (XtX, XtY1)

def add(x, y):
    return (x[0] + y[0], x[1] + y[1])

batch_product = list(map(batch_multiply.remote, batches))
product = functools.reduce(add, ray.get(batch_product))

beta_hat = np.linalg.solve(product[0], product[1])
print(beta_hat)

[ 3.14181941  0.13488122 -0.08797336  0.020057   -0.03026747  0.19883083
 -0.62364493  1.17550274 -0.47048949  1.02705771 -1.4166037   0.90081109
 -1.45796637  0.02245034  0.41823638  0.3620488   1.08189478 -1.42190939
  0.15569311 -0.0610593   0.20316543 -0.61430485  3.19286015 -1.59859505
 -0.56599101  0.07856355  0.27565988  0.86695486 -0.3448698   0.18266883
 -2.29837604]


4. 设计一个分布式算法，计算回归模型的 $R^2$。

$$
\mathrm{R}=\frac{\operatorname{cov}(\mathrm{y}, \hat{\mathrm{y}})}{\widehat{\sigma}_{\mathrm{y}} \widehat{\sigma}_{\hat{\mathrm{y}}}}=\frac{\sum\left(\hat{\mathrm{y}}_{\mathrm{i}}-\overline{\mathrm{y}}\right)^2}{\sum\left(\mathrm{y}_{\mathrm{i}}-\overline{\mathrm{y}}\right)^2} = \frac{\sum \hat{y_i}^2 - 2 \hat{y_i} \bar{y} + \bar{y}^2}{\sum y_i^2 - 2 y_i \bar{y} + \bar{y}^2} = \frac{\sum \hat{y_i}^2 - \sum 2 \hat{y_i} \bar{y} + n \bar{y}^2}{\sum y_i^2 - n \bar{y}^2}
$$

In [8]:
@ray.remote
def batch_result(batch) -> tuple:
    X = batch[:, 2:]
    X_plus = np.concatenate((np.ones((X.shape[0], 1)), X), axis=1)
    Y1 = batch[:, 0]
    Y1_hat = X_plus @ beta_hat
    sum_Y_hat = np.sum(Y1_hat)
    sum_Y_hat_square = np.sum(Y1_hat ** 2)
    sum_linear_y1 = np.sum(Y1)
    sum_quad_y1 = np.sum(Y1 ** 2)
    count = Y1.shape[0]
    return (sum_Y_hat, sum_Y_hat_square, sum_linear_y1, sum_quad_y1, count)

batch_result = list(map(batch_result.remote, batches))

def add(x, y):
    return (x[0] + y[0], x[1] + y[1], x[2] + y[2], x[3] + y[3], x[4] + y[4])
sum_Y_hat, sum_Y_hat_square, sum_linear_y1, sum_quad_y1, n = functools.reduce(add, ray.get(batch_result))
Y1_bar = sum_linear_y1 / n
RSS = sum_Y_hat_square - 2 * sum_Y_hat * Y1_bar + n * Y1_bar ** 2
SST = sum_quad_y1 - n * Y1_bar ** 2
R_square = RSS / SST
print(R_square)

0.9690896159842409


### 第2题

1. 考虑 Softplus 函数 $$\mathrm{softplus}(x)=\log(1+e^x)$$

请利用 Numpy 编写一个函数 `softplus(x)`，令其可以接收一个向量或矩阵 `x`，返回 Softplus 函数在 `x` 上的取值。

In [9]:
import numpy as np

def softplus(x: np.ndarray) -> np.ndarray:
    # 此处插入代码
    return np.log(1 + np.exp(x))


一个简单的测试：

In [10]:
x = np.array([-1000.0, -100.0, -10.0, 0.0, 1.0, 10.0, 100.0, 1000.0])

# 上面编写的函数
print(softplus(x))

[0.00000000e+00 0.00000000e+00 4.53988992e-05 6.93147181e-01
 1.31326169e+00 1.00000454e+01 1.00000000e+02            inf]


  return np.log(1 + np.exp(x))


2. 上述结果是否正常？如果出现异常取值，思考可能的原因是什么，并参照课件上的说明再次尝试编写 Softplus 函数。注意尽可能使用 Numpy 提供的向量化函数，避免使用循环。该函数需同时支持向量和矩阵参数。如果一切正常，可忽略此问题。

上述结果不正常，是由于输入中存在使得函数超过最小精度的数值。

In [11]:
def softplus(x):
    # 此处插入代码
    return np.maximum(x, 0) + np.log(1 + np.exp(-np.abs(x)))

再次测试：

In [12]:
print(softplus(x))
print()
print(softplus(x.reshape(4, 2)))

[0.00000000e+00 0.00000000e+00 4.53988992e-05 6.93147181e-01
 1.31326169e+00 1.00000454e+01 1.00000000e+02 1.00000000e+03]

[[0.00000000e+00 0.00000000e+00]
 [4.53988992e-05 6.93147181e-01]
 [1.31326169e+00 1.00000454e+01]
 [1.00000000e+02 1.00000000e+03]]


### 第3题

利用第1题中的数据，在 $Y_2$ 与 $X$ 之间建立 Logistic 回归模型。任选一种算法，估计 Logistic 回归的回归系数，**同时包含截距项**。请利用第2题中编写的 Softplus 函数，编写**数值稳定**的函数计算 Logistic 回归的目标函数和梯度（参见第11节课件）。

In [13]:
@ray.remote
def batch_obj_grad(batch, beta_old):
    from scipy.special import expit
    Y2 = batch[:, 1]
    X = batch[:, 2:]
    X = np.concatenate((np.ones((X.shape[0], 1)), X), axis=1)
    Xbeta = X @ beta_old
    
    prob = expit(Xbeta)
    softplus_Xbeta = softplus(Xbeta)
    obj = - np.sum(Y2 * (Xbeta - softplus_Xbeta) - (1 - Y2) * softplus_Xbeta)
    grad = X.T @ (prob - Y2)
    count = X.shape[0]
    return (count, obj, grad)

def compute_obj_grad(batches, beta_old):
    res_iter = map(lambda batch: batch_obj_grad.remote(batch, beta_old), batches)
    res_tasks = list(res_iter)
    res_list = ray.get(res_tasks)
    # 汇总样本量、目标函数和梯度
    n, obj, grad = functools.reduce(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]), res_list)
    # 计算（平均后）目标函数和梯度
    return obj / n, grad / n

In [14]:
import time

# 根据数据动态获取维度，不要使用之前模拟时的变量
p = ray.get(batches[0]).shape[1] - 2
beta_hat = np.zeros(p + 1)
# 记录目标函数值
objvals = []

# 最大迭代次数
maxit = 100
# 步长
step_size = 10.0
# 收敛条件
eps = 1e-5

t1 = time.time()
for i in range(maxit):
    # 完整数据的样本量和梯度是各分区的加和
    objfn, grad = compute_obj_grad(batches, beta_hat)
    # 计算新 beta
    beta_new = beta_hat - step_size * grad
    # 计算梯度的变化
    grad_norm = np.linalg.norm(grad)
    beta_norm = np.linalg.norm(beta_hat)
    print(f"Iteration {i}, objfn = {objfn}, grad_norm = {grad_norm}, beta_norm = {beta_norm}")
    objvals.append(objfn)
    # 如果梯度值较小，退出循环
    if grad_norm < eps or grad_norm < eps * beta_norm:
        break
    # 更新 beta
    beta_hat = beta_new
t2 = time.time()
print(f"\nfinished in {t2 - t1} seconds")

Iteration 0, objfn = 0.6931471805599462, grad_norm = 0.3826928696527176, beta_norm = 0.0
Iteration 1, objfn = 0.23149689439082566, grad_norm = 0.024065737690783427, beta_norm = 3.826928696527176
Iteration 2, objfn = 0.22751747980651865, grad_norm = 0.01519168771512872, beta_norm = 4.000052106029555
Iteration 3, objfn = 0.22540697115686498, grad_norm = 0.012892460074121542, beta_norm = 4.149998340103303
Iteration 4, objfn = 0.2238519123347774, grad_norm = 0.011255790271171781, beta_norm = 4.278805018225607
Iteration 5, objfn = 0.22266060207698987, grad_norm = 0.009926547077833246, beta_norm = 4.391350146180137
Iteration 6, objfn = 0.22173051735211532, grad_norm = 0.008823124968167725, beta_norm = 4.490601253252661
Iteration 7, objfn = 0.22099342446643983, grad_norm = 0.007892708735314707, beta_norm = 4.578820824929398
Iteration 8, objfn = 0.22040206159936307, grad_norm = 0.007098086098974317, beta_norm = 4.6577380604093594
Iteration 9, objfn = 0.21992273262488685, grad_norm = 0.00641219

2. 利用估计得到的 $\hat{\beta}$ 对原始数据进行预测，令 $\hat{\rho}_i$ 表示估计出的每个观测 $Y_{2i}$ 取值为1的概率。为每个观测计算一个预测的0-1标签 $\hat{l}_i$，规则如下：如果 $\hat{\rho}_i\ge 0.5$，则 $\hat{l}_i=1$，反之 $\hat{l}_i=0$。利用分布式算法计算模型的预测准确度，即 $n^{-1}\sum_{i=1}^n I(Y_{2i}=\hat{l}_i)$。$I(Y_{2i}=\hat{l}_i)$ 表示预测对取1，预测错取0。

In [15]:
@ray.remote
def batch_predict(batch):
    from scipy.special import expit
    Y2 = batch[:, 1]
    X = batch[:, 2:]
    X = np.concatenate((np.ones((X.shape[0], 1)), X), axis=1)
    
    predict_tag = list(map(lambda x: 1 if x >= 0.5 else 0, expit(X @ beta_hat)))
    accurate_count = np.sum(predict_tag == Y2)
    count = X.shape[0]
    return (accurate_count, count)

batches_predict = list(map(batch_predict.remote, batches))
result_predict = functools.reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]), ray.get(batches_predict))

accuracy = result_predict[0] / result_predict[1]
print(accuracy)

0.9057674151096308


In [16]:
ray.shutdown()