# 随机批量梯度下降求解逻辑回归参数

逻辑回归的预测函数：（它表示y=1的概率）
$$
h_{\theta}(x)=g\left(\theta^{T} x\right)=\frac{1}{1+e^{-\theta^{T} x}}
$$
    
在给定参数$\theta$和自变量$x$的条件下$y$取值的条件概率是：
$$
P(y | x ; \theta)=\left(h_{\theta}(x)\right)^{y}\left(1-h_{\theta}(x)\right)^{1-y}
$$
     
写出似然函数：
$$
\begin{aligned}
L(\theta) &=\prod_{i=1}^{m} P\left(y^{(i)} | x^{(i)} ; \theta\right) \\
&=\prod_{i=1}^{m}\left(h_{\theta}\left(x^{(i)}\right)\right)^{y^{(i)}}\left(1-h_{\theta}\left(x^{(i)}\right)\right)^{1-y^{(i)}}
\end{aligned}
$$
     
对数似然函数：
$$
\begin{aligned}
l(\theta) &=\log L(\theta) \\
&=\sum_{i=1}^{m}\left(y^{(i)} \log h_{\theta}\left(x^{(i)}\right)+\left(1-y^{(i)}\right) \log \left(1-h_{\theta}\left(x^{(i)}\right)\right)\right)
\end{aligned}
$$
      
目标是求得最大化似然函数的$\theta$。定义$J(\theta)$：
$$
J(\theta)=-\frac{1}{m} l(\theta)
$$
则转化为求得使$J(\theta)$最小的$\theta$，可采用梯度下降进行求解。
    
$\theta$的更新过程可写为（$\alpha$为学习率）：
$$
\theta_{j}:=\theta_{j}-\alpha \frac{\partial}{\partial \theta_{j}} J(\theta), \quad(j=0 \ldots n)
$$
$J(\theta)$对$\theta$求偏导：
$$
\frac{\partial}{\partial \theta_{j}} J(\theta)
=\frac{1}{m} \sum_{i=1}^{m}\left(h_{\theta}\left(\mathrm{x}^{(\mathrm{i})}\right)-y^{(i)}\right) x_{j}^{(\mathrm{i})}
$$
则$\theta$的更新过程为：
$$
\theta_{j}:=\theta_{j}-\alpha \frac{1}{m} \sum_{i=1}^{m}\left(h_{\theta}\left(\mathrm{x}^{(i)}\right)-y^{(i)}\right) x_{j}^{(i)}, \quad(j=0 \ldots n)
$$
    
另外，对于学习率的调整采用指数调整：
$$\alpha = 0.95^{epoch\_num} \cdot \alpha_0$$

### 在spark中实现Logistic回归的随机梯度下降的策略是：    
1. 给定初始的$\theta$
2. 随机抽取m个样本数据；
3. map：对每一行计算梯度
4. reduce：汇总每行梯度得到总的梯度
5. 更新$\theta$
6. 重复2-5，直至$\theta$的改变量小于某个阈值

In [1]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import Matrix, Matrices
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.ml.classification import LogisticRegression

from numpy.linalg import norm
import numpy as np
import pandas as pd
import math
from operator import add

                the kernel may be left running.  Please let us know
                about your system (bitness, Python, etc.) at
                ipython-dev@scipy.org
  ipython-dev@scipy.org""")


In [2]:
spark = SparkSession.builder.getOrCreate()
# sc = pyspark.SparkContext("local", "Simple App")

#### 构造数据

In [3]:
# 构造数据
n = 500 # 500条样本
p = 5 # 5个变量
theta_true = range(1,1+p) # p个参数
np.random.seed(seed=1) # 设置随机数种子为1
X = np.mat(np.random.randn(n,p)) # X 每个变量服从独立正态分布
# 获取y（加上一个扰动项）
y = np.mat(list(map(lambda x : 1 if 1/(1+math.exp(-x.dot(theta_true)+np.random.randn(1)))>0.5 else 0,X))).T
# 训练数据
data_xy_train = sc.parallelize(np.hstack((y,X)).tolist()[:450]) # RDD 
xy_LPtrain = data_xy_train.map(lambda line: LabeledPoint(line[0],line[1:])) # 从RDD创建LabeledPoint数据
# 测试数据
data_xy_test = sc.parallelize(np.hstack((y,X)).tolist()[450:]) # RDD 
xy_LPtest = data_xy_test.map(lambda line: LabeledPoint(line[0],line[1:])) # 从RDD创建LabeledPoint数据

#### 梯度下降求解参数

In [4]:
# 逻辑回归的预测函数h(x,theta)
def h(x,theta):
    temp = math.exp(-x.dot(theta))
    return 1 / (1 + temp)

In [5]:
# 计算一行的梯度
# 输入为一个LabelPoint数据
def grad_line(lineLP): 
    Xi = lineLP.features
    yi = lineLP.label
    # theta_value 读取广播变量的值
    return (h(Xi,theta_bc.value) - yi)*Xi

每次迭代更新后的theta通过广播传到各个节点。

In [6]:
def grad_desc(data,alpha = 1,frac = 0.1):# 输入训练集，学习率，每次迭代所用样本比例
    p = len(data.first().features) # 参数个数
    theta_new = np.ones(p) # 初始化theta，全部为1
    global theta_bc # 声明广播变量theta_bc为全局变量
    while(1): 
        theta_j = theta_new # 更新theta
        theta_bc = sc.broadcast(theta_j) # 将theta广播到每个节点
        data_sub = data.sample(False,frac) # 抽取样本
        grad_map = data_sub.map(grad_line) # 计算每行的梯度
        grad = grad_map.sum() # 加和
        theta_new = theta_j - alpha*grad # 新的theta_new
        print(theta_new)
        if norm(theta_new-theta_j)<1e-6: # 当theta在两次迭代之间的差值的二范数小于一个阈值时停止执行并返回参数
            return theta_new
        alpha = alpha * 0.95 # 调整学习率
        # print(alpha)

参数求解：本地运行比较耗时，服务器上较快速。

In [7]:
# 参数求解
theta = grad_desc(xy_LPtrain)

[-2.95062085  0.22275385  3.96459795  2.31184108  6.16401423]
[ 6.0620881   2.52782468 -1.03756589  8.80180194  7.1896923 ]
[4.26762283 2.49046076 3.97448121 6.46910487 7.90598008]
[-0.39468331  3.01036178  4.04282461  5.46540405 10.21972445]
[1.05612512 4.95366396 5.17188418 8.38012862 7.24833999]
[ 2.60064386  2.17042219  5.83385397  3.81747531 11.46093891]
[2.43151103 4.41547284 4.57673157 8.09612056 9.20319113]
[2.91673658 2.85175445 6.86058977 6.8787412  9.61618784]
[ 0.36619069  4.19567192  5.26500485  7.10507772 10.71888814]
[3.29616153 4.94940749 6.34016848 7.39676559 9.41510708]
[ 2.3813757   3.15123235  5.5589361   7.75141718 10.65855996]
[3.14790793 3.99326719 5.00895709 8.70441915 9.78720561]
[ 1.84490395  4.62488823  4.59290564  7.1398186  11.38449976]
[ 0.87929177  3.54781913  6.2510076   8.23987858 10.34999172]
[ 1.5738149   3.24719108  6.36014684  8.29164918 10.36121135]
[ 2.01639148  3.25124764  6.43471126  8.63162754 10.00016051]
[ 2.83433635  3.80876499  5.84934927  

[ 2.02877874  4.08576465  6.21617707  7.87932832 10.36707442]
[ 2.02988848  4.08465935  6.21738369  7.87821033 10.36679507]
[ 2.02911059  4.08381853  6.2171012   7.87976412 10.36635404]
[ 2.02929691  4.08251737  6.21863691  7.87872813 10.36665808]
[ 2.02595392  4.08167161  6.21821938  7.88057804 10.36658457]
[ 2.02315673  4.08231023  6.21799518  7.88227302 10.36583113]
[ 2.02387998  4.08196556  6.21712305  7.88339621 10.36558809]
[ 2.02517242  4.0821851   6.21651552  7.88226563 10.36625716]
[ 2.02403778  4.08289881  6.21525939  7.88422215 10.36542306]
[ 2.02436405  4.08385435  6.21665845  7.88285432 10.36488848]
[ 2.02456859  4.08459966  6.21594019  7.88236467 10.36534031]
[ 2.02406543  4.08455667  6.21585672  7.88260822 10.36531969]
[ 2.02332283  4.08473767  6.21643067  7.88213219 10.36550912]
[ 2.02297957  4.0853484   6.21556445  7.88155925 10.36624071]
[ 2.02355077  4.08516358  6.21542283  7.88086014 10.36680438]
[ 2.02412754  4.08437364  6.21470651  7.88013811 10.36760946]
[ 2.0250

[ 2.02382199  4.08151149  6.21438116  7.88342369 10.3654773 ]
[ 2.02382197  4.08151123  6.21438092  7.8834242  10.36547745]


In [8]:
theta

array([ 2.02382197,  4.08151123,  6.21438092,  7.8834242 , 10.36547745])

#### 预测效果评估

In [9]:
# 一个样本的预测结果
def predict_line(lineLP):
    Xi = lineLP.features
    yi = lineLP.label
    # theta_bc.value 读取广播变量的值
    Pi = h(Xi,theta_bc.value) # 计算S函数值
    yi_hat = 1 if Pi>0.5 else 0 
    if yi==1: # [TP,FP,FN,TN]
        if yi_hat ==1: return np.array([1,0,0,0])
        else: return np.array([0,0,1,0])
    elif yi_hat ==1: return np.array([0,1,0,0])
    return np.array([0,0,0,1])

In [10]:
def predict_performance(data_new,theta):
    global theta_bc # 声明广播变量theta为全局变量
    theta_bc = sc.broadcast(theta) # 广播theta
    pred_map = data_new.map(predict_line) # 获取每一条观测的预测情况
    pred_reduce = pred_map.reduce(add) # 整合所有预测情况
    accuracy = (pred_reduce[0]+pred_reduce[3])/sum(pred_reduce) # 准确率
    precision = pred_reduce[0]/(pred_reduce[0]+pred_reduce[1]) # 查准率
    recall = pred_reduce[0]/(pred_reduce[0]+pred_reduce[2]) # 召回率
    return [accuracy,precision,recall]

In [11]:
# 训练样本上的准确率
train_pfm = predict_performance(xy_LPtrain,theta)
print('Prediction performace in training set: \naccuracy: ',
     round(train_pfm[0],2),'\nprecision: ',round(train_pfm[1],2),
     '\nrecall: ',round(train_pfm[2],2))

# 测试样本上的准确率
test_pfm = predict_performance(xy_LPtest,theta)
print('Prediction performace in test set: \naccuracy: ',
     round(test_pfm[0],2),'\nprecision: ',round(test_pfm[1],2),
     '\nrecall: ',round(test_pfm[2],2))

Prediction performace in training set: 
accuracy:  0.94 
precision:  0.94 
recall:  0.95
Prediction performace in test set: 
accuracy:  0.98 
precision:  1.0 
recall:  0.95


### 对比mllib中的逻辑回归模型估计函数LogisticRegressionWithLBFGS

In [12]:
model = LogisticRegressionWithLBFGS.train(xy_LPtrain)
mllib_theta = model.weights
mllib_theta

DenseVector([1.6814, 3.3606, 5.1239, 6.5283, 8.5064])

In [13]:
# 训练样本上的准确率
mllib_train_pfm = predict_performance(xy_LPtrain,mllib_theta)
print('Prediction performace in training set: \naccuracy: ',
     round(mllib_train_pfm[0],2),'\nprecision: ',round(mllib_train_pfm[1],2),
     '\nrecall: ',round(mllib_train_pfm[2],2))

# 测试样本上的准确率
mllib_test_pfm = predict_performance(xy_LPtest,mllib_theta)
print('Prediction performace in test set: \naccuracy: ',
     round(mllib_train_pfm[0],2),'\nprecision: ',round(mllib_train_pfm[1],2),
     '\nrecall: ',round(mllib_train_pfm[2],2))

Prediction performace in training set: 
accuracy:  0.94 
precision:  0.94 
recall:  0.95
Prediction performace in test set: 
accuracy:  0.94 
precision:  0.94 
recall:  0.95


自行实现的梯度下降求解参数的逻辑回归模型在测试集上的表现优于millib库中自带的LogisticRegressionWithLBFGS函数。