# <机器学习>课程 Lecture 3 实验

## MLP 多层感知机

给定一组数据,其输入维度为2,输出维度为1.
请补全MLP的BP算法的计算过程,实现模型的分类.

首先加载数据,并可视化,观察模型是否线性可分.

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

data_filename = 'Lec2实验/cls_data_mlp.npy'
cls_data = np.load(data_filename)
x_data, y_data = cls_data[:, :-1], cls_data[:, -1]

x_train, x_test, y_train, y_test = train_test_split(
    x_data, y_data,
    train_size=0.8, shuffle=True,
    stratify=y_data
)
scaler = StandardScaler()
x_train_std = scaler.fit_transform(x_train)
x_test_std = scaler.fit_transform(x_test)

# x_data in [b, c_in]
c_in = x_data.shape[1]
# y_data in [b, c_out]
c_out = 1
#test file

In [None]:
from matplotlib import pyplot as plt

plt.figure()

pos_data = x_data[y_data == 1, :]
neg_data = x_data[y_data == 0, :]

plt.scatter(pos_data[:, 0], pos_data[:, 1], c='red')
plt.scatter(neg_data[:, 0], neg_data[:, 1], c='blue')

plt.show()

首先定义若干激活函数,包括sigmoid函数和ReLU函数.

同时定义一个预测准确率的辅助函数.

In [None]:
from typing import List

def sigmoid(val):
    """ sigmoid function """
    return np.exp(val) / ( 1. + np.exp(val))

def relu(val):
    """ relu activation """
    return np.where(val >= 0, val, 0)

# 考虑实现更多的激活函数,例如 tanh, mish, swish等
def _get_activation(activation:str='relu'):
    if activation == 'relu':
        return relu
    elif activation == 'sigmoid':
        return sigmoid
    else:
        raise NotImplementedError(
            f"activation {activation} is not supported"
        )

def calc_accu(pred: List[int], target: List[int]) -> float:
    """ calculate classification accuracy """
    assert pred.shape[0] == target.shape[0], \
            f"inputs and labels should be in same shape but \
              get inputs in {pred.shape} and lables in {target.shape}"
    assert np.min(pred) > 0 and np.max(pred) < 1.0, \
            f"inputs should between 0 and 1, \
              but get {np.min(pred)} - {np.max(pred)}"
    if len(pred.shape) == 2:
        pred = pred[:, 0]
    assert pred.shape == target.shape

    return ((pred > 0.5) == target).mean()

针对二分类,我们使用NLL损失,即使用模型预测的概率计算负对数似然.

计算公式
$$\mathcal L = -\sum y\log p$$

计算损失函数到输入值的梯度为
$$\frac{\partial \mathcal L}{\partial p} = - \sum y \frac{1}{p}$$

In [None]:
# 定义损失函数及其梯度
class NLLLoss(object):
    """ Negative Log Likelihood Loss"""
    def __init__(self, reduction:str=None):
        self.reduction = reduction

        # 保留上下文用于计算梯度
        self.ctx_inputs = None
        self.ctx_labels = None

    def forward(self, inputs, labels):
        """ calculate loss """
        assert inputs.shape[0] == labels.shape[0], \
            f"inputs and labels should be in same number of samples \
              but get {inputs.shape[0]} inputs and {labels.shape[0]} lables"
        assert np.min(inputs) > 0 and np.max(inputs) < 1.0, \
            f"inputs should between 0 and 1, \
                but get value from {np.min(inputs):.4f} to {np.max(inputs):.4f}"

        if len(inputs.shape) == 2:
            labels = labels[:, None]
        assert inputs.shape == labels.shape

        # 计算损失
        likelihood = np.multiply(labels, np.log(inputs)) \
             + np.multiply((1 - labels), np.log(1 - inputs))
        loss = - likelihood

        # 保存上下文信息
        self.ctx_inputs = inputs
        self.ctx_labels = labels

        if self.reduction == 'mean':
            return np.mean(loss)
        elif self.reduction == 'sum':
            return np.sum(loss)
        else:
            return loss

    def backward(self, prev_grad):
        """ calculate gradient """

        # === 请补全NLL损失的梯度计算 ===
        grad = None

        return  None

对于mlp中的每一层,定义一层感知器.
包括正向传播,反向传播和梯度更新.

其中正向传播公式为
$$y = \sigma(z) = \sigma(w^Tx + b)$$
其中z为输入特征的加权和,保存在ctx_hidden变量中用于计算残差

反向传播过程中,首先计算残差
$$\frac{\partial \mathcal L}{\partial z} = \frac{\partial \mathcal L}{\partial y} \frac{\partial y}{\partial z}$$

可以得到损失函数对权重的梯度,保存在上下文中用于梯度更新.
同时需要返回损失函数对前一层输出的梯度,以便前一层的梯度计算.

In [None]:
class LinearLayer:
    """ Perceptron Layer in MLP """
    def __init__(
        self,
        c_in: int, c_out: int,
        init_mean: float, init_var: float,
        bias:bool = False,
        activation: str = 'relu'
    ) -> None:

        self.bias = bias
        if self.bias:
            weight_size = (c_in+1, c_out)
        else:
            weight_size = (c_in, c_out)

        self.grad = None
        self.weight = np.random.normal(
            init_mean, init_var,
            size=weight_size
        )

        self.activation = _get_activation(activation)

        self.ctx_inputs = None
        self.ctx_hidden = None
        self.ctx_outputs = None

    def forward(self, inputs):
        """ forward the network """
        bsz = inputs.shape[0]

        if self.bias:
            self.ctx_inputs = np.concatenate([
                inputs,
                np.ones(shape=(bsz, 1))
            ], axis=1)
        else:
            self.ctx_inputs = inputs

        self.ctx_hidden = np.matmul(self.ctx_inputs, self.weight)
        self.ctx_outputs = self.activation(self.ctx_hidden)

        return self.ctx_outputs

    def backward(self, prev_grad):
        """ calculate the gradient """
        assert self.ctx_hidden.shape == prev_grad.shape, \
            f"expected same shape of ctx_hidden and prev_grad, \
              but get ctx_hidden in {self.ctx_hidden.shape} and \
              prev_grad in {prev_grad.shape}"

        # === 请补全三种激活函数的梯度计算 ===
        if self.activation.__name__ == 'relu':
            residual = None
        elif self.activation.__name__ == 'sigmoid':
            residual = None
        else:
            raise NotImplementedError(f"activation not supported")

        # === 请补全单层感知机中参数的梯度计算 ===
        # === 可以参考上一次实验课的代码 ===
        self.grad = None

        # === 请补全单层感知机对输入的梯度计算 ===
        # === 即损失函数对前一层的输出的梯度 ===
        if self.bias:
            return None
        else:
            return None

    def update(self, learning_rate):
        """ update weight with the gradient """
        assert self.grad is not None

        self.weight = self.weight - learning_rate * self.grad
        self.grad = None

最后定义完整的MLP模型,包括多个感知器层.

默认情况下浅层模型中间层使用sigmoid激活,最后一层使用sigmoid激活函数(分类任务).

前向传播的时候逐层传播,然后计算损失值.

从损失值开始逐层反向传播,计算每一层权重的梯度.

最后更新每一层的权重

In [None]:
class MLP:
    """ Multi Layer Peceptron """
    def __init__(
        self,
        num_feat_list,
        init_mean,
        init_val,
    ) -> None:
        assert isinstance(num_feat_list, list), \
            f"expected num_feat_list is a list of int \
                but get a {type(num_feat_list)}"
        assert isinstance(num_feat_list[0], int), \
            f"expected num_feat_list is a list of int \
                but get a list of {type(num_feat_list[0])}"

        self.num_layers = len(num_feat_list) - 1

        self.layers = []
        for i in range(1, 1 + self.num_layers):
            if i != self.num_layers:
                self.layers.append(
                    LinearLayer(num_feat_list[i-1], num_feat_list[i],
                                init_mean=init_mean, init_var=init_val,
                                bias=False, activation='sigmoid')
                )
            else:
                self.layers.append(
                    LinearLayer(num_feat_list[i-1], num_feat_list[i],
                                init_mean=init_mean, init_var=init_val,
                                bias=True, activation='sigmoid')
                )

        self.loss_fn = NLLLoss(reduction='mean')

        print("initialize a mlp model:")
        print("layer \t weight.shape \t activation")
        for idx, item in enumerate(self.layers):
            print(f"{idx}\t{item.weight.shape}\t{item.activation.__name__}")

    def forward(self, inputs):
        ''' forward layer by layer '''
        res = inputs
        for layer in self.layers:
            res = layer.forward(res)

        return res

    def fit(self,
        inputs: np.ndarray,
        labels: np.ndarray,
        lr: float,
        epochs:int
    )->None:
        """ fit mlp model with gradient descent """

        for epoch_idx in range(epochs):
            # === forward and loss ===
            res = self.forward(inputs)
            loss = self.loss_fn.forward(res, labels)

            # === backward ===
            # === 请补全反向传播的过程 ===
            # === 即对于每一层依次反向利用链式法则计算梯度 ===
            pass

            # === update ===
            for layer in self.layers:
                layer.update(lr)

            pred_train = self.forward(inputs)
            accu = calc_accu(pred_train, labels)

            print(f"{epoch_idx:03d}/{epochs}: loss = {loss:.4f}, accu = {accu.mean():.2%}%")


In [None]:
# 考虑修改不同的模型结构,例如[2, 16, 1]或者[2, 4, 4, 1]等
# 考虑修改不同的初始化参数, 可以自行查阅资料调研xavier_init或者kaiming_init等
model = MLP([2, 8, 1], init_mean=0, init_val=0.1)

x_test_pred = model.forward(x_test)
print('before train: ', calc_accu(x_test_pred, y_test))

model.fit(x_train, y_train, lr=0.001, epochs=100)

x_test_pred = model.forward(x_test)
print('after train: ', calc_accu(x_test_pred, y_test))