In [1]:
import numpy as np

# 坐标轴起始点
start = 0
# 坐标轴结束点
end = 6
X = np.array([[1, 1], [1, 2], [1, 3], [1, 4], [1, 5]])
y = np.array([1598, 3898, 6220, 7799, 10510])

In [2]:
from sklearn.linear_model import Lasso

# 初始化Lasso回归器，默认使用坐标下降法
reg = Lasso(alpha=0.1, fit_intercept=False)
# 拟合线性模型
reg.fit(X, y)
# 权重系数
W = reg.coef_
W

array([-511.37334449, 2172.18363941])

In [3]:
import numpy as np

def calcCost(X, y, W, lambdas = 0.1):
    return np.sum(np.square(X.dot(W) - y)) + lambdas * np.linalg.norm(x=W, ord=1)

In [4]:
import numpy as np
import matplotlib.pyplot as plt

%matplotlib notebook

plt.rcParams['font.sans-serif'] = ['PingFang HK']  # 选择一个本地的支持中文的字体
fig, ax = plt.subplots()
ax.set_facecolor('#f8f9fa')

x1 = X[:][:, 1]
y1 = y
ax.scatter(x1, y1, marker='o', s=20, c='#e63946')
x2 = np.linspace(start, end, 100)
y2 = W[1] * x2 + W[0]
ax.plot(x2, y2, '#457b9d', label='坐标下降法, Cost: %f'%(calcCost(X, y, W)))
for i in range(len(x1)):
    ax.plot(np.linspace(x1[i], x1[i], 100), np.linspace(y1[i], W[1] * x1[i] + W[0], 100), '#219ebc', linestyle='--')
ax.set_title('Lasso回归-坐标下降法', color='#264653')
ax.set_xlabel('X', color='#264653')
ax.set_ylabel('Y', color='#264653')
ax.tick_params(labelcolor='#264653')
plt.legend(loc="upper left")
plt.show()

<IPython.core.display.Javascript object>

In [5]:
def lassoUseCd(X, y, lambdas=0.1, max_iter=1000, tol=1e-4):
    """
    Lasso回归，使用坐标下降法（coordinate descent）
    args:
        X - 训练数据集
        y - 目标标签值
        lambdas - 惩罚项系数
        max_iter - 最大迭代次数
        tol - 变化量容忍值
    return:
        W - 权重系数
    """
    # 初始化 W 为零向量
    W = np.zeros(X.shape[1])
    for it in range(max_iter):
        done = True
        # 遍历所有自变量
        for i in range(0, len(W)):
            # 记录上一轮系数
            w = W[i]
            # 求出当前条件下的最佳系数
            W[i] = down(X, y, W, i, lambdas)
            # 当其中一个系数变化量未到达其容忍值，继续循环
            if (np.abs(w - W[i]) > tol):
                done = False
        # 所有系数都变化不大时，结束循环
        if (done):
            break
    return W

def down(X, y, W, index, lambdas=0.1):
    """
    cost(W) = (x1 * w1 + x2 * w2 + ... - y)^2 + ... + λ(|w1| + |w2| + ...)
    假设 w1 是变量，这时其他的值均为常数，带入上式后，其代价函数是关于 w1 的一元二次函数，可以写成下式：
    cost(w1) = (a * w1 + b)^2 + ... + λ|w1| + c (a,b,c,λ 均为常数)
    => 展开后
    cost(w1) = aa * w1^2 + 2ab * w1 + λ|w1| + c (aa,ab,c,λ 均为常数)
    """
    # 展开后的二次项的系数之和
    aa = 0
    # 展开后的一次项的系数之和
    ab = 0
    for i in range(X.shape[0]):
        # 括号内一次项的系数
        a = X[i][index]
        # 括号内常数项的系数
        b = X[i][:].dot(W) - a * W[index] - y[i]
        # 可以很容易的得到展开后的二次项的系数为括号内一次项的系数平方的和
        aa = aa + a * a
        # 可以很容易的得到展开后的一次项的系数为括号内一次项的系数乘以括号内常数项的和
        ab = ab + a * b
    # 由于是一元二次函数，当导数为零是，函数值最小值，只需要关注二次项系数、一次项系数和 λ
    return det(aa, ab, lambdas)

def det(aa, ab, lambdas=0.1):
    """
    通过代价函数的导数求 w，当 w = 0 时，不可导
    det(w) = 2aa * w + 2ab + λ = 0 (w > 0)
    => w = - (2 * ab + λ) / (2 * aa)
    
    det(w) = 2aa * w + 2ab - λ = 0 (w < 0)
    => w = - (2 * ab - λ) / (2 * aa)
    
    det(w) = NaN (w = 0)
    => w = 0
    """
    w = - (2 * ab + lambdas) / (2 * aa)
    if w < 0:
        w = - (2 * ab - lambdas) / (2 * aa)
        if w > 0:
            w = 0
    return w

In [6]:
W = lassoUseCd(X, y, lambdas=1)
W

array([-511.79961997, 2172.29989635])

In [7]:
import numpy as np
import matplotlib.pyplot as plt

%matplotlib notebook

plt.rcParams['font.sans-serif'] = ['PingFang HK']  # 选择一个本地的支持中文的字体
fig, ax = plt.subplots()
ax.set_facecolor('#f8f9fa')

x1 = X[:][:, 1]
y1 = y
ax.scatter(x1, y1, marker='o', s=20, c='#e63946')
x2 = np.linspace(start, end, 100)
y2 = W[1] * x2 + W[0]
ax.plot(x2, y2, '#457b9d', label='坐标下降法, Cost: %f'%(calcCost(X, y, W)))
for i in range(len(x1)):
    ax.plot(np.linspace(x1[i], x1[i], 100), np.linspace(y1[i], W[1] * x1[i] + W[0], 100), '#219ebc', linestyle='--')
ax.set_title('Lasso回归-坐标下降法', color='#264653')
ax.set_xlabel('X', color='#264653')
ax.set_ylabel('Y', color='#264653')
ax.tick_params(labelcolor='#264653')
plt.legend(loc="upper left")
plt.show()

<IPython.core.display.Javascript object>

In [8]:
import numpy as np
import matplotlib.pyplot as plt

def lassoUseCdWithWs(X, y, lambdas=0.1, max_iter=1000, tol=1e-4):
    Ws = []
    W = np.zeros(X.shape[1])
    for it in range(max_iter):
        done = True
        for i in range(0, len(W)):
            Ws.append(np.array(W))
            w = W[i]
            W[i] = down(X, y, W, i, lambdas)
            if (np.abs(w - W[i]) > tol):
                done = False
        if (done):
            Ws.append(np.array(W))
            break
    return Ws

In [9]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation

%matplotlib notebook

plt.rcParams['font.sans-serif'] = ['PingFang HK']  # 选择一个本地的支持中文的字体
fig, ax = plt.subplots()

Ws = lassoUseCdWithWs(X, y, tol=1e-1)
text = plt.text(-900, 2350, '', fontsize=12)

def update(i):
    start = Ws[i]
    end = Ws[i + 1]
    x1 = np.linspace(start[0], end[0], 100)
    y1 = np.linspace(start[1], end[1], 100)
    head_width = (end[0] + 1000) / 100
    plt.arrow(
        start[0], 
        start[1], 
        end[0] - start[0], 
        end[1] - start[1], 
        head_width=head_width, 
        head_length=head_width * 2,
        fc='r',
        ec='r',
        length_includes_head = True)
    ax.set_xlim(-1000, 6500)
    ax.set_ylim(-100, 2500)
    line, = ax.plot(x1, y1, 'r')
    text.set_text("(w0, w1)=(%.1f, %.1f), Cost=%.4f"%(end[0], end[1], calcCost(X, y, end)))
    return line

ani = animation.FuncAnimation(fig, update, frames = range(len(Ws)), interval=100, blit=True, repeat=False)
# ani.save('lasso_cd.gif')
ax.set_facecolor('#f8f9fa')
ax.set_xlim(-1000, 6500)
ax.set_ylim(-100, 2500)
ax.set_title('Lasso回归-坐标下降法', color='#264653')
ax.set_xlabel('w0', color='#264653')
ax.set_ylabel('w1', color='#264653')
ax.tick_params(labelcolor='#264653')
plt.show()

<IPython.core.display.Javascript object>

In [10]:
def lassoUseLars(X, y, lambdas=1, max_iter=1000):
    """
    Lasso回归，使用最小角回归法（Least Angle Regression）
    论文：https://web.stanford.edu/~hastie/Papers/LARS/LeastAngle_2002.pdf
    args:
        X - 训练数据集
        y - 目标标签值
        lambdas - 惩罚项系数
        max_iter - 最大迭代次数
    return:
        W - 权重系数
    """
    n, m = X.shape
    # 已被选择的特征下标
    active_set = set()
    # 当前预测向量
    cur_pred = np.zeros((n,), dtype=np.float32)
    # 残差向量
    residual = y - cur_pred
    # 特征向量与残差向量的点积，即相关性
    cur_corr = X.T.dot(residual)
    # 选取相关性最大的下标
    j = np.argmax(np.abs(cur_corr), 0)
    # 将下标添加至已被选择的特征下标集合
    active_set.add(j)
    # 初始化权重系数
    W = np.zeros((m,), dtype=np.float32)
    # 记录上一次的权重系数
    prev_W = np.zeros((m,), dtype=np.float32)
    # 记录特征更新方向
    sign = np.zeros((m,), dtype=np.int32)
    sign[j] = 1
    # 平均相关性
    lambda_hat = None
    # 记录上一次平均相关性
    prev_lambda_hat = None
    for it in range(max_iter):
        # 计算残差向量
        residual = y - cur_pred
        # 特征向量与残差向量的点积
        cur_corr = X.T.dot(residual)
        # 当前相关性最大值
        largest_abs_correlation = np.abs(cur_corr).max()
        # 计算当前平均相关性
        lambda_hat = largest_abs_correlation / n
        # 当平均相关性小于λ时，提前结束迭代
        # https://github.com/scikit-learn/scikit-learn/blob/2beed55847ee70d363bdbfe14ee4401438fba057/sklearn/linear_model/_least_angle.py#L542
        if lambda_hat <= lambdas:
            if (it > 0 and lambda_hat != lambdas):
                ss = ((prev_lambda_hat - lambdas) / (prev_lambda_hat - lambda_hat))
                # 重新计算权重系数
                W[:] = prev_W + ss * (W - prev_W)
            break
        # 更新上一次平均相关性
        prev_lambda_hat = lambda_hat
        
        # 当全部特征都被选择，结束迭代
        if len(active_set) > m:
            break
        
        # 选中的特征向量
        X_a = X[:, list(active_set)]
        # 论文中 X_a 的计算公式 - (2.4)
        X_a *= sign[list(active_set)]
        # 论文中 G_a 的计算公式 - (2.5)
        G_a = X_a.T.dot(X_a)
        G_a_inv = np.linalg.inv(G_a)
        G_a_inv_red_cols = np.sum(G_a_inv, 1)     
        # 论文中 A_a 的计算公式 - (2.5)
        A_a = 1 / np.sqrt(np.sum(G_a_inv_red_cols))
        # 论文中 ω 的计算公式 - (2.6)
        omega = A_a * G_a_inv_red_cols
        # 论文中角平分向量的计算公式 - (2.6)
        equiangular = X_a.dot(omega)
        # 论文中 a 的计算公式 - (2.11)
        cos_angle = X.T.dot(equiangular)
        # 论文中的 γ
        gamma = None
        # 下一个选择的特征下标
        next_j = None
        # 下一个特征的方向
        next_sign = 0
        for j in range(m):
            if j in active_set:
                continue
            # 论文中 γ 的计算方法 - (2.13)
            v0 = (largest_abs_correlation - cur_corr[j]) / (A_a - cos_angle[j]).item()
            v1 = (largest_abs_correlation + cur_corr[j]) / (A_a + cos_angle[j]).item()
            if v0 > 0 and (gamma is None or v0 < gamma):
                gamma = v0
                next_j = j
                next_sign = 1
            if v1 > 0 and (gamma is None or v1 < gamma):
                gamma = v1
                next_j = j
                next_sign = -1
        if gamma is None:
            # 论文中 γ 的计算方法 - (2.21)
            gamma = largest_abs_correlation / A_a
        
        # 选中的特征向量
        sa = X_a
        # 角平分向量
        sb = equiangular * gamma
        # 解线性方程（sa * sx = sb）
        sx = np.linalg.lstsq(sa, sb)
        # 记录上一次的权重系数
        prev_W = W.copy()
        d_hat = np.zeros((m,), dtype=np.int32)
        for i, j in enumerate(active_set):
            # 更新当前的权重系数
            W[j] += sx[0][i] * sign[j]
            # 论文中 d_hat 的计算方法 - (3.3)
            d_hat[j] = omega[i] * sign[j]
        # 论文中 γ_j 的计算方法 - (3.4)
        gamma_hat = -W / d_hat
        # 论文中 γ_hat 的计算方法 - (3.5)
        gamma_hat_min = float("+inf")
        # 论文中 γ_hat 的下标
        gamma_hat_min_idx = None
        for i, j in enumerate(gamma_hat):
            if j <= 0:
                continue
            if gamma_hat_min > j:
                gamma_hat_min = j
                gamma_hat_min_idx = i
        if gamma_hat_min < gamma:
            # 更新当前预测向量 - (3.6)
            cur_pred = cur_pred + gamma_hat_min * equiangular
            # 将下标移除至已被选择的特征下标集合
            active_set.remove(gamma_hat_min_idx)
            # 更新特征更新方向集合
            sign[gamma_hat_min_idx] = 0
        else:
            # 更新当前预测向量
            cur_pred = X.dot(W)
            # 将下标添加至已被选择的特征下标集合
            active_set.add(next_j)
            # 更新特征更新方向集合
            sign[next_j] = next_sign
            
    return W

In [11]:
W = lassoUseLars(X, y, lambdas=1)
W



array([-505.51062, 2170.5032 ], dtype=float32)

In [12]:
import numpy as np
import matplotlib.pyplot as plt

%matplotlib notebook

plt.rcParams['font.sans-serif'] = ['PingFang HK']  # 选择一个本地的支持中文的字体
fig, ax = plt.subplots()
ax.set_facecolor('#f8f9fa')

x1 = X[:][:, 1]
y1 = y
ax.scatter(x1, y1, marker='o', s=20, c='#e63946')
x2 = np.linspace(start, end, 100)
y2 = W[1] * x2 + W[0]
ax.plot(x2, y2, '#457b9d', label='最小角回归法, Cost: %f'%(calcCost(X, y, W)))
for i in range(len(x1)):
    ax.plot(np.linspace(x1[i], x1[i], 100), np.linspace(y1[i], W[1] * x1[i] + W[0], 100), '#219ebc', linestyle='--')
ax.set_title('Lasso回归-最小角回归法', color='#264653')
ax.set_xlabel('X', color='#264653')
ax.set_ylabel('Y', color='#264653')
ax.tick_params(labelcolor='#264653')
plt.legend(loc="upper left")
plt.show()

<IPython.core.display.Javascript object>

In [13]:
X = np.array([[1, 1], [1, 2], [1, 3], [1, 4], [1, 5]])
y = np.array([1598, 3898, 6220, 7799, 10510])

from sklearn.linear_model import LassoLars

# 初始化Lasso回归器，使用最小角回归法
reg = LassoLars(alpha=1, fit_intercept=False)
# 拟合线性模型
reg.fit(X, y)
# 权重系数
W = reg.coef_
W

array([-505.5, 2170.5])

In [14]:
import numpy as np
import matplotlib.pyplot as plt

%matplotlib notebook

plt.rcParams['font.sans-serif'] = ['PingFang HK']  # 选择一个本地的支持中文的字体
fig, ax = plt.subplots()
ax.set_facecolor('#f8f9fa')

x1 = X[:][:, 1]
y1 = y
ax.scatter(x1, y1, marker='o', s=20, c='#e63946')
x2 = np.linspace(start, end, 100)
y2 = W[1] * x2 + W[0]
ax.plot(x2, y2, '#457b9d', label='最小角回归法, Cost: %f'%(calcCost(X, y, W)))
for i in range(len(x1)):
    ax.plot(np.linspace(x1[i], x1[i], 100), np.linspace(y1[i], W[1] * x1[i] + W[0], 100), '#219ebc', linestyle='--')
ax.set_title('Lasso回归-最小角回归法', color='#264653')
ax.set_xlabel('X', color='#264653')
ax.set_ylabel('Y', color='#264653')
ax.tick_params(labelcolor='#264653')
plt.legend(loc="upper left")
plt.show()

<IPython.core.display.Javascript object>

In [15]:
import numpy as np
import sklearn.datasets

diabetes_train_splitsize = 1.0

beta_path = None
sum_abs_coeff = None

def fetch_diabetes():
    diabetes = sklearn.datasets.load_diabetes()
    X_all = diabetes.data
    y_all = diabetes.target

    total_N = len(y_all)
    train_N = int(total_N * diabetes_train_splitsize)
    test_N = total_N - train_N
    rand = np.random.mtrand.RandomState(seed=123)
    train_idx = set(rand.choice(total_N, size=(train_N,), replace=False))

    train_X = X_all[list(train_idx)]
    train_y = y_all[list(train_idx)]

    test_idx = np.zeros((test_N,), dtype=np.int32)
    test_n = 0
    for n in range(total_N):
        if n not in train_idx:
            test_idx[test_n] = n
            test_n += 1
    test_X = X_all[test_idx]
    test_y = y_all[test_idx]

    def get_add_mul(X):
        add = - np.average(X, 0)
        X1 = X + add
        mul = 1 / np.sqrt((X1 * X1).sum(0))
        return add, mul

    X_add, X_mul = get_add_mul(train_X)
    y_add = - np.average(train_y)

    train_X = (train_X + X_add) * X_mul
    train_y = train_y + y_add
    if len(test_X) > 0:
        test_X = (test_X + X_add) * X_mul
        test_y = test_y + y_add
    return train_X, train_y

In [16]:
train_X, train_y = fetch_diabetes()

lassoUseLars(train_X, train_y, lambdas=0.1)



array([   0.      , -155.34605 ,  517.21155 ,  275.0924  ,  -52.552994,
          0.      , -210.14128 ,    0.      ,  483.91895 ,   33.661045],
      dtype=float32)

In [17]:
from sklearn.linear_model import LassoLars

reg = LassoLars(alpha=0.1, fit_intercept=False)
reg.fit(train_X, train_y)
reg.coef_

array([   0.        , -155.3460066 ,  517.21148051,  275.09234291,
        -52.55294797,    0.        , -210.1412593 ,    0.        ,
        483.91893709,   33.66104332])

In [18]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation

%matplotlib notebook

train_X, train_y = fetch_diabetes()

n_alphas = 100
alphas = np.logspace(-3, 1, n_alphas)

coefs = []
for a in alphas:
    coefs.append(lassoUseLars(train_X, train_y, lambdas=a))

fig, ax = plt.subplots()
xdatas, ydatas = [], []
lns = []
text = plt.text(9, 700, '', fontsize=12)
for i in range(0, 10):
    xdatas.append([])
    ydatas.append([])
    ln, = ax.plot([], [])
    lns.append(ln)

def init():
    ax.set_xlim(alphas[0] , alphas[len(alphas) - 1])
    ax.set_ylim(-800, 800)
    ax.set_xscale('log')
    ax.set_xlim(ax.get_xlim()[::-1])  # reverse axis
    return lns

def update(i):
    for j in range(0, 10):
        xdatas[j].append(alphas[i])
        ydatas[j].append(coefs[i][j])
        lns[j].set_data(xdatas[j], ydatas[j])
    text.set_text("λ = %.4f"%(alphas[i]))
    return lns

ani = animation.FuncAnimation(fig, update, frames = range(len(alphas) - 1, -1, -1), init_func=init, interval=50, blit=True, repeat=False)
# ani.save('lasso.gif')
plt.title('Lasso回归-最小角回归法', color='#264653')
plt.xlabel('λ')
plt.ylabel('权重系数')
plt.axis('tight')
plt.show()



<IPython.core.display.Javascript object>