# 梯度下降
[知乎](https://zhuanlan.zhihu.com/p/77380412)

In [1]:
import numpy as np

## Batch Gradient Descent（BGD，批量梯度下降）

In [None]:
class BatchGradientDescent:
    def __init__(self, eta=0.01, n_iter=1000, tolerance=0.001):
        self.eta = eta
        self.n_iter = n_iter
        self.tolerance = tolerance

    def fit(self, X, y):
        n_samples = len(X)
        X = np.c_[np.ones(n_samples), X]  # 增加截距项
        n_features = X.shape[-1]

        self.theta = np.ones(n_features)
        self.loss_ = [0]

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            errors = X.dot(self.theta) - y
            loss = 1 / (2 * n_samples) * errors.dot(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
            else:
                gradient = 1 / n_samples * X.T.dot(errors)
                self.theta -= self.eta * gradient

        return self

## Stochastic Gradient Descent（SGD，随机梯度下降）

In [None]:
class StochasticGradientDescent(BatchGradientDescent):
    def __init__(self, shuffle=True, random_state=None, **kwargs):
        super(StochasticGradientDescent, self).__init__(**kwargs)
        self.shuffle = shuffle
        if random_state:
            np.random.seed(random_state)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape
        self.theta = np.ones(n_features)
        self.loss_ = [0]

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)  # 重新排序
            errors = []
            for xi, yi in zip(X, y):
                error_i = xi.dot(self.theta) - yi
                errors.append(error_i**2)
                gradient_i = xi.T.dot(error_i)  # 单个样本的梯度
                self.theta -= self.eta * gradient_i
            loss = 1/2 * np.mean(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self

    @staticmethod
    def _shuffle(X, y):
        location = np.random.permutation(len(y))
        return X[location], y[location]

## Mini-Batch Gradient Descent（MBGD，小批量梯度下降）

In [None]:
class MiniBatchGradientDescent(StochasticGradientDescent):
    def __init__(self, batch_size=10, **kwargs):
        self.batch_size = batch_size
        super(MiniBatchGradientDescent, self).__init__(**kwargs)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape
        self.theta = np.ones(n_features)
        self.loss_ = [0]

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)

            errors = []
            for j in range(0, n_samples, self.batch_size):
                mini_X, mini_y = X[j: j + self.batch_size], y[j: j + self.batch_size]
                error = mini_X.dot(self.theta) - mini_y  # 长度与batch_size的长度一致
                errors.append(error.dot(error))
                mini_gradient = 1 / self.batch_size * mini_X.T.dot(error)  # 小批量样本梯度
                self.theta -= self.eta * mini_gradient
            loss = 1 / (2 * self.batch_size) * np.mean(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self

## Moment Gradient Descent（MGD，动量梯度下降）


In [None]:
 class MomentumGradientDescent(MiniBatchGradientDescent):
    def __init__(self, gamma=0.9, **kwargs):
        self.gamma = gamma                # 当gamma=0时，相当于小批量随机梯度下降
        super(MomentumGradientDescent, self).__init__(**kwargs)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape
        self.theta = np.ones(n_features)
        self.velocity = np.zeros_like(self.theta)
        self.loss_ = [0]

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)

            errors = []
            for j in range(0, n_samples, self.batch_size):
                mini_X, mini_y = X[j: j + self.batch_size], y[j: j + self.batch_size]
                error = mini_X.dot(self.theta) - mini_y
                errors.append(error.dot(error))
                mini_gradient = 1 / self.batch_size * mini_X.T.dot(error)
                self.velocity = self.velocity * self.gamma + self.eta * mini_gradient
                self.theta -= self.velocity
            loss = 1 / (2 * self.batch_size) * np.mean(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self

## Adaptive Gradient Descent（AdaGrad，自适应梯度下降，2011）


In [None]:
class AdaptiveGradientDescent(MiniBatchGradientDescent):
    def __init__(self, epsilon=1e-6, **kwargs):
        self.epsilon = epsilon
        super(AdaptiveGradientDescent, self).__init__(**kwargs)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape
        self.theta = np.ones(n_features)
        self.loss_ = [0]

        gradient_sum = np.zeros(n_features)

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)

            errors = []
            for j in range(0, n_samples, self.batch_size):
                mini_X, mini_y = X[j: j + self.batch_size], y[j: j + self.batch_size]
                error = mini_X.dot(self.theta) - mini_y
                errors.append(error.dot(error))
                mini_gradient = 1 / self.batch_size * mini_X.T.dot(error)
                gradient_sum += mini_gradient ** 2
                adj_gradient = mini_gradient / (np.sqrt(gradient_sum + self.epsilon))
                self.theta -= self.eta * adj_gradient
            loss = 1 / (2 * self.batch_size) * np.mean(errors)

            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self

## Adaptive Delta Gradient Descent（AdaDelta，自适应调整梯度下降, 2012）


In [None]:
class AdaDelta(MiniBatchGradientDescent):
    def __init__(self, gamma=0.95, epsilon=1e-6, **kwargs):
        self.gamma = gamma
        self.epsilon = epsilon
        super(AdaDelta, self).__init__(**kwargs)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape
        self.theta = np.ones(n_features)
        self.loss_ = [0]

        gradient_exp = np.zeros(n_features)
        delta_theta_exp = np.zeros(n_features)

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)

            errors = []
            for j in range(0, n_samples, self.batch_size):
                mini_X, mini_y = X[j: j + self.batch_size], y[j: j + self.batch_size]
                error = mini_X.dot(self.theta) - mini_y
                errors.append(error.dot(error))
                mini_gradient = 1 / self.batch_size * mini_X.T.dot(error)
                gradient_exp = self.gamma * gradient_exp + (1 - self.gamma) * mini_gradient ** 2
                gradient_rms = np.sqrt(gradient_exp + self.epsilon)
                delta_theta = -np.sqrt(delta_theta_exp + self.epsilon) / gradient_rms * mini_gradient
                delta_theta_exp = self.gamma * delta_theta_exp + (1 - self.gamma) * delta_theta ** 2
                delta_theta_rms = np.sqrt(delta_theta_exp + self.epsilon)
                delta_theta = -delta_theta_rms / gradient_rms * mini_gradient
                self.theta += delta_theta

            loss = 1 / (2 * self.batch_size) * np.mean(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self

## Root Mean Square Prop（RMSProp，均方根支撑， 2012）


In [None]:
class RMSProp(MiniBatchGradientDescent):
    def __init__(self, gamma=0.9, epsilon=1e-6, **kwargs):
        self.gamma = gamma
        self.epsilon = epsilon
        super(RMSProp, self).__init__(**kwargs)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape
        self.theta = np.ones(n_features)
        self.loss_ = [0]

        gradient_exp = np.zeros(n_features)

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)

            errors = []
            for j in range(0, n_samples, self.batch_size):
                mini_X, mini_y = X[j: j + self.batch_size], y[j: j + self.batch_size]
                error = mini_X.dot(self.theta) - mini_y
                errors.append(error.dot(error))
                mini_gradient = 1 / self.batch_size * mini_X.T.dot(error)
                gradient_exp = self.gamma * gradient_exp + (1 - self.gamma) * mini_gradient ** 2
                gradient_rms = np.sqrt(gradient_exp + self.epsilon)
                self.theta -= self.eta / gradient_rms * mini_gradient

            loss = 1 / (2 * self.batch_size) * np.mean(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self

## Nesterov Accelerated Gradient Descent（NAG，Nesterov加速梯度下降，2013）


In [None]:
class NesterovAccelerateGradient(MomentumGradientDescent):
    def __init__(self, **kwargs):
        super(NesterovAccelerateGradient, self).__init__(**kwargs)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape

        self.theta = np.ones(n_features)
        self.velocity = np.zeros_like(self.theta)
        self.loss_ = [0]

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)

            errors = []
            for j in range(0, n_samples, self.batch_size):
                mini_X, mini_y = X[j: j + self.batch_size], y[j: j + self.batch_size]
                error = mini_X.dot(self.theta - self.gamma * self.velocity) - mini_y
                errors.append(error.dot(error))
                mini_gradient = 1 / self.batch_size * mini_X.T.dot(error)
                self.velocity = self.velocity * self.gamma + self.eta * mini_gradient
                self.theta -= self.velocity
                loss = 1 / (2 * self.batch_size) * np.mean(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self

## Adaptive Moment Estimation（Adam，自适应矩估计，2014）


In [None]:
class AdaptiveMomentEstimation(MiniBatchGradientDescent):
    def __init__(self, beta_1=0.9, beta_2=0.999, epsilon=1e-6, **kwargs):
        self.beta_1 = beta_1
        self.beta_2 = beta_2
        self.epsilon = epsilon
        super(AdaptiveMomentEstimation, self).__init__(**kwargs)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape
        self.theta = np.ones(n_features)
        self.loss_ = [0]

        m_t = np.zeros(n_features)
        v_t = np.zeros(n_features)

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)
            errors = []
            for j in range(0, n_samples, self.batch_size):
                mini_X, mini_y = X[j: j + self.batch_size], y[j: j + self.batch_size]
                error = mini_X.dot(self.theta) - mini_y
                errors.append(error.dot(error))
                mini_gradient = 1 / self.batch_size * mini_X.T.dot(error)
                m_t = self.beta_1 * m_t + (1 - self.beta_1) * mini_gradient
                v_t = self.beta_2 * v_t + (1 - self.beta_2) * mini_gradient ** 2
                m_t_hat = m_t / (1 - self.beta_1 ** self.i)  # correction
                v_t_hat = v_t / (1 - self.beta_2 ** self.i)
                self.theta -= self.eta / (np.sqrt(v_t_hat) + self.epsilon) * m_t_hat

            loss = 1 / (2 * self.batch_size) * np.mean(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self

## Adaptive Moment Estimation Max（AdaMax, 2015）


In [None]:
class AdaMax(AdaptiveMomentEstimation):
    def __init__(self, **kwargs):
        super(AdaMax, self).__init__(**kwargs)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape
        self.theta = np.ones(n_features)
        self.loss_ = [0]

        m_t = np.zeros(n_features)
        u_t = np.zeros(n_features)

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)
            errors = []
            for j in range(0, n_samples, self.batch_size):
                mini_X, mini_y = X[j: j + self.batch_size], y[j: j + self.batch_size]
                error = mini_X.dot(self.theta) - mini_y
                errors.append(error.dot(error))
                mini_gradient = 1 / self.batch_size * mini_X.T.dot(error)
                m_t = self.beta_1 * m_t + (1 - self.beta_1) * mini_gradient
                m_t_hat = m_t / (1 - self.beta_1 ** self.i)
                u_t = np.max(np.c_[self.beta_2 * u_t, np.abs(mini_gradient)], axis=1)
                self.theta -= self.eta / u_t * m_t_hat
            loss = 1 / (2 * self.batch_size) * np.mean(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self

## Nesterov Adaptive Moment Estimation（Nadam，Nesterov加速自适应矩估计，2016）


In [None]:
class Nadam(AdaptiveMomentEstimation):
    def __init__(self, **kwargs):
        super(Nadam, self).__init__(**kwargs)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape
        self.theta = np.ones(n_features)
        self.loss_ = [0]

        m_t = np.zeros(n_features)
        v_t = np.zeros(n_features)

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)
            errors = []
            for j in range(0, n_samples, self.batch_size):
                mini_X, mini_y = X[j: j + self.batch_size], y[j: j + self.batch_size]
                error = mini_X.dot(self.theta) - mini_y
                errors.append(error.dot(error))
                mini_gradient = 1 / self.batch_size * mini_X.T.dot(error)
                m_t = self.beta_1 * m_t + (1 - self.beta_1) * mini_gradient
                v_t = self.beta_2 * v_t + (1 - self.beta_2) * mini_gradient ** 2
                m_t_hat = m_t / (1 - self.beta_1 ** self.i)  # correction
                v_t_hat = v_t / (1 - self.beta_2 ** self.i)
                self.theta -= self.eta / (np.sqrt(v_t_hat) + self.epsilon) * (
                            self.beta_1 * m_t_hat + (1 - self.beta_1) * mini_gradient / (1 - self.beta_1 ** self.i))

            loss = 1 / (2 * self.batch_size) * np.mean(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self

## Adam & RMSProp Gradient Descent (AMSGrad, 2018)

In [None]:
class AMSGrad(AdaptiveMomentEstimation):
    def __init__(self, **kwargs):
        super(AMSGrad, self).__init__(**kwargs)

    def fit(self, X, y):
        X = np.c_[np.ones(len(X)), X]
        n_samples, n_features = X.shape
        self.theta = np.ones(n_features)
        self.loss_ = [0]

        m_t = np.zeros(n_features)
        v_t = np.zeros(n_features)
        v_t_hat = np.zeros(n_features)

        self.i = 0
        while self.i < self.n_iter:
            self.i += 1
            if self.shuffle:
                X, y = self._shuffle(X, y)
            errors = []
            for j in range(0, n_samples, self.batch_size):
                mini_X, mini_y = X[j: j + self.batch_size], y[j: j + self.batch_size]
                error = mini_X.dot(self.theta) - mini_y
                errors.append(error.dot(error))
                mini_gradient = 1 / self.batch_size * mini_X.T.dot(error)
                m_t = self.beta_1 * m_t + (1 - self.beta_1) * mini_gradient
                v_t = self.beta_2 * v_t + (1 - self.beta_2) * mini_gradient ** 2
                v_t_hat = np.max(np.hstack((v_t_hat, v_t)))
                self.theta -= self.eta / (np.sqrt(v_t_hat) + self.epsilon) * m_t

            loss = 1 / (2 * self.batch_size) * np.mean(errors)
            delta_loss = loss - self.loss_[-1]
            self.loss_.append(loss)
            if np.abs(delta_loss) < self.tolerance:
                break
        return self