# 前馈神经网络

In [None]:
# 导入必要的工具包
import torch
from torch import nn
import torch.nn.functional as F
# 绘画时使用的工具包
import matplotlib.pyplot as plt
# 当在notebook中运行时，请键入以下代码，图片可以在单元格中输出，在编译器中运行时不需要
%matplotlib inline
# 导入鸢尾花数据集
from sklearn.datasets import load_iris

## 净活性值

假设一个神经元接收 𝐷 个输入𝑥1 , 𝑥2 , ⋯ , 𝑥𝐷，令向量𝒙 = [𝑥1 ; 𝑥2 ; ⋯ ; 𝑥𝐷]来表示这组输入，并用净活性值 𝑧 表示一个神经元所获得的输入信号𝒙的加权和。
为了提高效率，通常会将N  (N>1) 个样本归为一组进行成批预测。

In [None]:
# x 表示两个含有5个特征的样本，x是一个二维的tensor
x = torch.randn((2,5))
# w 表示含有5个参数的权重向量，w是一个二维的tensor
w = torch.randn((5,1))
# 偏置项，b是一个二维的tensor，但b只有一个数值
b = torch.randn((1,1))
# 矩阵乘法，请注意 x 和 w 的顺序，与 b 相加时使用了广播机制
z = torch.matmul(x, w) + b
# 另一种写法
z_2 = x@w + b 
# 打印结果，z是一个二维的tensor，表示两个样本经过神经元后的各自净活性值
print('output z:' , z)
print('shape of z: ', z.shape)
print('output z_2:', z_2)
print('shape of z:', z_2.shape)

简洁实现，使用PyTorch提供的类。请牢记参数的实际意义

In [None]:
# 实例化一个线性层，接受输入维度是5，输出维度是1
net = nn.Linear(5,1)
z_3 = net(x)
# 打印结果，z2的形状与z一样，含义也与z一样
print('output z2: ', z_3)
print('shape of z2:' , z_3.shape)


## 激活函数

净活性值在经过一个非线性函数𝑓(⋅)后，得到神经元的活性值𝑎，也就是该层神经元的输出。

激活函数通常为非线性函数，常用的有Sigmoid型激活函数和ReLU激活函数，要想实现他们，其实非常的简单。

In [6]:
# Logistic 函数
def logistic(z):
    return 1.0 / (1.0 + torch.exp(-z))
# Tanh函数
def tanh(z):
    return (torch.exp(z) - torch.exp(-z)) / (torch.exp(z) + torch.exp(-z))
# ReLU函数
def relu(z):
    return torch.max(z, torch.zeros_like(z))
# leakyReLU函数
def leaky_relu(z, gamma=0.1):
    positive = torch.max(z, torch.zeros_like(z))
    negative = torch.min(z, torch.zeros_like(z))
    return positive + gamma * negative


In [None]:
# 画出激活函数的图像
# 从-10 到 10 每间隔0.01 取一个数
a = torch.arange(-10, 10, 0.01)
plt.figure()
# 在第一个子图中绘制Sigmoid型激活函数
plt.subplot(2, 2, 1)
plt.plot(a.tolist(), logistic(a).tolist(), color= 'red', label='logistic')
plt.plot(a.tolist(), tanh(a).tolist(), color='blue', linestyle='--', label='tanh')
# 在第二个子图中绘制ReLU型激活函数
plt.subplot(222)
plt.plot(a.tolist(), relu(a).tolist(), color='g', label='relu')
plt.plot(a.tolist(), leaky_relu(a).tolist(), color='black',linestyle='--', label='leaky relu')

plt.legend()
plt.show()

简洁实现，使用PyTorch提供的方法

In [None]:
# z为前面计算的净活性值
sig_output = torch.sigmoid(z)
tan_output = torch.tanh(z)
relu_output = torch.relu(z)
# 打印输出结果
print('sigmoid:',sig_output)
print('tanh:', tan_output)
print('ReLU:', relu_output)

## 基于前馈神经网络的二分类任务

想要完成一个深度学习的任务,可以简单地将其分为7步，构建数据集、构建模型、设计损失函数、根据损失函数模型训练阶段、训练同时的测试阶段、训练结果展示、预测阶段；其中前4步是不可缺少的。

In [None]:
# 线性层算子，请一定注意继承自 nn. Module, 这会帮你解决许多细节上的问题
class Linear(nn.Module): 
    def __init__(self, input_size, output_size):
        super(Linear, self).__init__()
        self.params = {}
        self.params['W'] = nn.Parameter(torch.randn(input_size, output_size, requires_grad=True))
        self.params['b'] = nn.Parameter(torch.randn(1, output_size, requires_grad=True))
        self.grads = {}
        self.inputs = None
        
    def forward(self, inputs):
        self.inputs = inputs
        outputs = torch.matmul(inputs, self.params['W']) + self.params['b']
        return outputs
    
# 实现一个两层的前馈神经网络
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super( MLP, self).__init__()
        self.fc1 = Linear(input_size, hidden_size)
        self.fc2 = Linear(hidden_size, output_size)
    
    def forward(self, x):
        z1 = self.fc1(x)
        a1 = logistic(z1)
        z2 = self.fc2(a1)
        a2 = logistic(z2)
        return a2

In [None]:
input = torch.ones((1,10))
input_size, hidden_size, output_size = 10, 5, 2
net = MLP(input_size, hidden_size, output_size)
output = net(input)
output

In [None]:
# 定义损失函数，对于分类问题，常使用交叉熵作为损失函数
def cross_entropy(y_hat, y):
    return - torch.log(y_hat[range(len(y_hat)), y])

In [None]:
# 请深刻理解这种表达形式。
# y的含义是，第一个样本是第1类，第二个样本是第3类；
# y_hat的含义是，模型预测第一个样本属于不同类的置信度分别是0.1、0.3和0.6，模型预测第二个样本属于不同类别的置信度分别是0.3、0.2、0.5

y = torch.tensor([0, 2])
y_hat = torch.tensor([[0.1, 0.3, 0.6], [0.3, 0.2, 0.5]])

cross_entropy(y_hat, y)

## 反向传播算法

截止到目前，你应该对神经网络的前向过程有了一定的理解，但我们实现的激活函数和损失函数还都不支持反向传播。
接下来，让我们加入这个功能。神经网络相当于一个复合函数，需要利用链式法则进行反向传播来计算梯度。

In [None]:
class BinaryCrossEntropyLoss():
    def __init__(self, model):
        self.predicts = None
        self.labels = None
        self.num = None
        self.model = model
        
    def __call__(self, predicts, labels):
        return self.forward(predicts, labels)
    
    def forward(self, predicts, labels):
        self.predicts = predicts
        self.labels = labels
        self.num = self.predicts.shape[0]
        loss = - torch.log(y_hat[range(len(y_hat)), y])
        
        return loss
    
    def backward(self):
        
        inputs_grads = -1 * (self.labels/self.predicts - (1 - self.labels)/(1 - self.predicts))/self.num
        
        self.model.backward(inputs_grads)

In [47]:
class Logistic():
    def __init__(self):
        self.inputs = None
        self.outputs = None
        self.params = None
        
    def __call__(self, x):
        return self.forward(x)
    
    def forward(self, inputs):
        outputs =  1.0 / (1.0 + torch.exp(-inputs))
        self.outputs = outputs
        return outputs
    
    def backward(self, outputs_grads=None):
        if outputs_grads is None:
            outputs_grads = torch.ones(self.outputs.shape)
        outputs_grad_inputs = torch.multiply(self.outputs, (1.0 - self.outputs))
        return torch.multiply(outputs_grads, outputs_grad_inputs)

In [None]:
act = Logistic()
x = torch.tensor([3,3,4,2])
y = act(x)

z = act.backward()
z

In [50]:
class Linear:
    def __init__(self, input_size, output_size):
        self.params = {}
        self.params['W'] = nn.Parameter(torch.randn(input_size, output_size, requires_grad=True))
        self.params['b'] = nn.Parameter(torch.randn(1, output_size, requires_grad=True))
        self.inputs = None
        self.grads = {}
    
    def __call__(self, x):
        return self.forward(x)
    
    def forward(self, inputs):
        self.inputs = inputs
        outputs = torch.matmul(self.inputs, self.params['W']) + self.params['b']
        
        return outputs
    
    def backward(self, grads=None):
        if grads == None:
            grads = torch.ones(self.params['W'].shape)
        self.grads['w'] = torch.matmul(self.inputs.T, grads)
        self.grads['b'] = torch.sum(grads, dim=0)
        return torch.matmul(grads, self.params['W'].T)

In [None]:
net = Linear(4, 2)
x = torch.tensor([1,1,1,1], dtype=torch.float32)
y = net(x)
z = net.backward()
z

## 自动求导

PyTorch中，所有神经网络的核心是 autograd 包。autograd包为张量上的所有操作提供了自动求导机制。它是一个在运行时定义 ( define-by-run ）的框架，这意味着反向传播是根据代码如何运行来决定的，并且每次迭代可以是不同的。

每个张量都有一个.grad_fn属性，该属性引用了创建 Tensor 自身的Function(除非这个张量是用户手动创建的，即这个张量的grad_fn是 None )。下面给出的例子中，张量由用户手动创建，因此grad_fn返回结果是None。

In [None]:
# 最简单的情况，X是一个标量
x = torch.tensor(2, dtype=torch.float32, requires_grad=True)
y = x ** 2 + 4 * x 
print(x.grad)
y.backward()
print(x.grad)

如果 Tensor 是一个标量(即它包含一个元素的数据），则不需要为 backward() 指定任何参数，但是如果它有更多的元素，则需要指定一个gradient参数，该参数是形状匹配的张量。

In [None]:
x = torch.ones(2, 2, requires_grad=True)
# y是一个矩阵
y = x ** 2 + 4 * x
y.backward(torch.ones(2, 2))
print(x.grad)

In [None]:
u = x ** 3 + 2 * x
# z是一个标量
z = u.sum()
z.backward()
print(x.grad)

## 实践

   在之前实现的代码中，借助深度学习框架的帮助，我们已经完成了大部分功能。在最后的实验中，我们引入小批量和随机梯度下降法，即每次仅根据数据集一个批量大小的数据来更新参数。
   为了使用小批量，通常的做法是构建一个数据迭代器,在每次迭代时从全部数据集中获取指定数量的数据。我们可以借助深度学习框架中的Dataset类和DataLoader类来实现此功能。
   随机梯度下降法则完全引用深度学习框架已经实现好的方法。
   在继承Dataset类时，应实现\_\_getitem\_\_和\_\_len\_\_两个方法，前者会根据索引获取数据集中指定的样本，后者则返回数据集样本的个数。

In [None]:
# 此函数用于加载鸢尾花数据集
def load_data(shuffle=True):
    x = torch.tensor(load_iris().data)
    y = torch.tensor(load_iris().target)
    
    # 数据归一化
    x_min = torch.min(x, dim=0).values
    x_max = torch.max(x, dim=0).values
    x = (x-x_min)/(x_max-x_min)
    
    if shuffle:
        idx = torch.randperm(x.shape[0])
        x = x[idx]
        y = y[idx]
    return x, y

In [None]:
# 构建自己的数据集,继承自Dataset类
from torch.utils.data import Dataset, DataLoader

class IrisDataset(Dataset):
    def __init__(self, mode='train', num_train=120, num_dev=15):
        super(IrisDataset,self).__init__()
        x, y = load_data(shuffle=True)
        if mode == 'train':
            self.x, self.y = x[:num_train], y[:num_train]
        elif mode == 'dev':
            self.x, self.y = x[num_train:num_train + num_dev], y[num_train:num_train + num_dev]
        else:
            self.x, self.y = x[num_train + num_dev:], y[num_train + num_dev:]
    
    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]
    
    def __len__(self):
        return len(self.x)

实例化数据集后,使用DataLoader进行封装.为简便可使数据集样本总数为批量大小的整数倍,但实际并不强制要求;shuffle参数的含义是是否随机从数据集中取数据

In [None]:
batch_size = 16

# 分别构建训练集、验证集和测试集
train_dataset = IrisDataset(mode='train')
dev_dataset = IrisDataset(mode='dev')
test_dataset = IrisDataset(mode='test')

train_loader = DataLoader(train_dataset, batch_size=batch_size,shuffle=True)
dev_loader = DataLoader(dev_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=True)

完全依靠深度学习框架，构建一个简单的两层前馈神经网络，输入层神经元个数为4，输出层神经元个数为3，隐含层神经元个数为6。
这个前馈神经网络和我们在前面实现的MLP类最大的区别在于，我们实现类中使用了自己写的激活函数，该激活函数不能通过反向传播更新参数，但深度学习框架已经帮我们完成了这个功能。（其实通过简单的改动，我们的激活函数也可以反传梯度，

In [None]:
class FeedForward(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(FeedForward,self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.act = nn.Sigmoid()
    
    def forward(self, inputs):
        outputs = self.fc1(inputs)
        outputs = self.act(outputs)
        outputs = self.fc2(outputs)
        return outputs

In [None]:
现在，让我们实现第一个辅助功能——计算预测的准确率。Accuracy支持对每一个回合中每批数据进行评价，并将结果累积，最终获得整批数据的评价结果

In [None]:
# 支持分批进行模型评价的 Accuracy 类
class Accuracy:
    def __init__(self, is_logist=True):
        # 正确样本个数
        self.num_correct = 0
        # 样本总数
        self.num_count = 0
        self.is_logist = is_logist
    
    def update(self, outputs, labels):
        # 判断是否为二分类任务
        if outputs.shape[1] == 1:
            outputs = outputs.squeeze(-1)
            # 判断是否是logit形式的预测值
            if self.is_logist:
                preds = (outputs >= 0).long()
            else:
                preds = (preds >= 0.5).long()
        else:
            # 多分类任务时，计算最大元素索引作为类别
            preds = torch.argmax(outputs, dim=1).long()
            
        # 获取本批数据中预测正确的样本个数
        labels = labels.squeeze(-1)
        batch_correct = (preds==labels).float().sum()
        batch_count = len(labels)
        # 更新
        self.num_correct += batch_correct
        self.num_count += batch_count
        
    def accumulate(self):
        # 使用累计的数据，计算总的评价指标
        if self.num_count == 0:
            return 0
        return self.num_correct/self.num_count
    
    def reset(self):
        self.num_correct = 0
        self.num_count = 0

In [None]:
# 利用之前构造的数据进行测试
acc = Accuracy()
acc.update(y_hat, y)
acc.num_correct

In [None]:
现在，让我们实现第二个辅助功能——一个整合了训练过程的类。

In [None]:
class Runner(object):
    def __init__(self, model, optimizer, loss_fn, metric, **kwargs):
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        # 用于计算评价指标
        self.metric = metric
        
        # 记录训练过程中的评价指标变化
        self.dev_scores = []
        # 记录训练过程中的损失变化
        self.train_epoch_losses = []
        self.dev_losses = []
        # 记录全局最优评价指标
        self.best_score = 0
   
 
# 模型训练阶段
    def train(self, train_loader, dev_loader=None, **kwargs):
        # 将模型设置为训练模式，此时模型的参数会被更新
        self.model.train()
        
        num_epochs = kwargs.get('num_epochs', 0)
        log_steps = kwargs.get('log_steps', 100)
        save_path = kwargs.get('save_path','best_mode.pth')
        eval_steps = kwargs.get('eval_steps', 0)
        # 运行的step数，不等于epoch数
        global_step = 0
        
        if eval_steps:
            if dev_loader is None:
                raise RuntimeError('Error: dev_loader can not be None!')
            if self.metric is None:
                raise RuntimeError('Error: Metric can not be None')
                
        # 遍历训练的轮数
        for epoch in range(num_epochs):
            total_loss = 0
            # 遍历数据集
            for step, data in enumerate(train_loader):
                x, y = data
                logits = self.model(x.float())
                loss = self.loss_fn(logits, y.long())
                total_loss += loss
                if log_steps and global_step%log_steps == 0:
                    print(f'loss:{loss.item():.5f}')
                    
                loss.backward()
                self.optimizer.step()
                self.optimizer.zero_grad()
            # 每隔一定轮次进行一次验证，由eval_steps参数控制，可以采用不同的验证判断条件
            if (epoch+1)% eval_steps ==  0:

                dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)
                print(f'[Evalute] dev score:{dev_score:.5f}, dev loss:{dev_loss:.5f}')
                
                if dev_score > self.best_score:
                    self.save_model(f'model_{epoch+1}.pth')
                    
                    print(f'[Evaluate]best accuracy performance has been updated: {self.best_score:.5f}-->{dev_score:.5f}')
                    self.best_score = dev_score
                    
                # 验证过程结束后，请记住将模型调回训练模式   
                self.model.train()
            
            global_step += 1
            # 保存当前轮次训练损失的累计值
            train_loss = (total_loss/len(train_loader)).item()
            self.train_epoch_losses.append((global_step,train_loss))
            
        print('[Train] Train done')
        
    # 模型评价阶段
    def evaluate(self, dev_loader, **kwargs):
        assert self.metric is not None
        # 将模型设置为验证模式，此模式下，模型的参数不会更新
        self.model.eval()
        global_step = kwargs.get('global_step',-1)
        total_loss = 0
        self.metric.reset()
        
        for batch_id, data in enumerate(dev_loader):
            x, y = data
            logits = self.model(x.float())
            loss = self.loss_fn(logits, y.long()).item()
            total_loss += loss 
            self.metric.update(logits, y)
            
        dev_loss = (total_loss/len(dev_loader))
        self.dev_losses.append((global_step, dev_loss))
        dev_score = self.metric.accumulate()
        self.dev_scores.append(dev_score)
        return dev_score, dev_loss
    
    # 模型预测阶段，
    def predict(self, x, **kwargs):
        self.model.eval()
        logits = self.model(x)
        return logits
    
    # 保存模型的参数
    def save_model(self, save_path):
        torch.save(self.model.state_dict(),save_path)
        
    # 读取模型的参数
    def load_model(self, model_path):
        self.model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))

## 模型训练

In [None]:
input_size = 4
output_size = 3
hidden_size = 6
# 定义模型
model = FeedForward(input_size, hidden_size, output_size)
# 定义损失函数
loss_fn = F.cross_entropy
# 定义优化器
optimizer = torch.optim.SGD(model.parameters(), lr=0.2)
# 定义评价方法
metric = Accuracy(is_logist=True)
# 实例化辅助runner类
runner = Runner(model, optimizer, loss_fn, metric)
# 模型训练
runner.train(train_loader, dev_loader, num_epochs=50, log_steps=10, eval_steps=5)

在训练过程中，Runner类记录了每次迭代后的损失函数值和准确率(在不同设置下，记录的频率会有所不同)，可视化函数利用这些记录，绘制图像。

In [None]:
# 绘制训练集和验证集的损失变化以及验证集上的准确率变化曲线
def plot_training_loss_acc(runner, fig_name, fig_size=(16, 6), sample_step=20, loss_legend_loc='upper right', acc_legend_loc='lower right',
                          train_color = '#8E004D', dev_color = '#E20079', fontsize='x-large', train_linestyle='-', dev_linestyle='--'):
    plt.figure(figsize=fig_size)
    plt.subplot(1,2,1)
    train_items = runner.train_epoch_losses[::sample_step]
    train_steps = [x[0] for x in train_items]
    train_losses = [x[1] for x in train_items]
    
    plt.plot(train_steps, train_losses, color=train_color, linestyle=train_linestyle, label='Train loss')
    if len(runner.dev_losses) > 0:
        dev_steps = [x[0] for x in runner.dev_losses]
        dev_losses = [x[1] for x in runner.dev_losses]
        plt.plot(dev_steps, dev_losses, color=dev_color, linestyle=dev_linestyle,label='dev loss')
    
    plt.ylabel('loss')
    plt.xlabel('step')
    plt.legend(loc=loss_legend_loc)
    if len(runner.dev_scores) > 0:
        plt.subplot(1,2,2)
        plt.plot(dev_steps, runner.dev_scores, color=dev_color, linestyle=dev_linestyle, label='dev accuracy')
        
        plt.ylabel('score')
        plt.xlabel('step')
        plt.legend(loc=acc_legend_loc)
    # 将绘制结果保存
    plt.savefig(fig_name)
    plt.show()


In [None]:
# 调用绘图函数，将runner中的信息绘制并保存
plot_training_loss_acc(runner, 'chapter4.pdf')

## 预测阶段

In [None]:
# 训练结束后，网络的参数会自动保存为.pth结尾的文件，且与训练文件在同一目录下
model_path = 'model_25.pth'
# 首先读入经过训练后的网络的参数
runner.load_model(model_path)
x, label= next(iter(test_loader))

   请分析一下预测结果，输出的三个数字分别代表三个类的得分，模型认为最后一类得分最高，即输入特征对应为最后一类，与标签值吻合。
    在构建测试集的DataLoader类时，我们设置shuffle为True，所以可以多次尝试预测不同的结果并与标签值对比。

In [None]:
print(runner.predict(x.float()))
print(label)