In [1]:
import struct
import os
import random
import math
import numpy as np
import matplotlib.pyplot as plt
import pickle

np.random.seed(3)

In [2]:
def decode_labels(file):
    '''
    解码标签文件
    '''
    
    # rb  ->  read binary
    with open(file, "rb") as f:
        binary_data = f.read()
        
    # 大端方式解析出2个int32，返回的是tuple(int1, int2)
    # int1 -> magic number，用来验证数据是否是目标数据
    _, num_items = struct.unpack_from(">II", binary_data, 0)
    labels = struct.unpack_from("B" * num_items, binary_data, 8)
    return np.array(labels).reshape(-1, 1).astype(np.int32)

def decode_images(file):
    '''
    解码图像数据
    '''
    
    # rb  ->  read binary
    with open(file, "rb") as f:
        binary_data = f.read()
        
    # 大端方式解析出4个int32，返回的是tuple(magic number, num images, rows, cols)
    _, num_images, rows, cols = struct.unpack_from(">IIII", binary_data, 0)
    images = struct.unpack_from("B" * num_images * rows * cols, binary_data, 16)
    return np.array(images).reshape(-1, rows * cols)


def one_hot(t, num_classes):   # 将标量转换为矩阵，方便计算
    
    rows = t.shape[0]
    output = np.zeros((rows, num_classes))
    for row in range(rows):
        label = t[row, 0]
        output[row, label] = 1
    return output

In [61]:
# 数据的封装
class Dataset:
    # 动态的，那么Dataset是个基类，所有动态的继承自Dataset
    # 需要实现什么接口？
    def __getitem__(self, index):
        raise NotImplementedError()
        
    def __len__(self):
        raise NotImplementedError()
      
    
class MNISTDataset(Dataset):
    # 针对mnist数据的解析、加载、归一化
    def __init__(self, image_file, label_file):
        
        self.num_classes = 10
        self.images = decode_images(image_file)
        self.labels = decode_labels(label_file)
        self.images = (self.images / 255.0 - 0.5).astype(np.float32)
        self.labels_one_hot = one_hot(self.labels, self.num_classes)
        
    def __getitem__(self, index):
        # 角色的职责
        # 实现图像加载、归一化/标准化、onehot
        # 为什么要返回one_hot，计算时，使用one_hot比较方便
        # 为什么要返回label，因为做测试的时候，label比较方便
        # pytorch里面，CELoss使用的不是one_hot。所以以后不需要返回one_hot
        return self.images[index], self.labels[index], self.labels_one_hot[index]
    
    def __len__(self):
        return len(self.images)
    

class DataLoaderIterator:
    # 数据加载器的迭代器
    # 职责：
    # 负责一轮数据的打乱、封装操作
    def __init__(self, dataloader):
        self.dataloader = dataloader
        
        # 这里有2中处理方法
        # 1.向上取整
        # 2.整除，向下取整，多余部分丢弃
        # 这里考虑用2
        self.num_batch_per_epoch = len(dataloader)
        
        # 定义指针记录当前batch的索引
        self.batch_cursor = 0
        
        # 实现一轮数据的打乱和封装获取
        # 与其打乱数据，不如打乱索引
        self.indexs = list(range(len(dataloader.dataset)))
        
        # 如果需要随机打乱，条件控制由dataloader的shuffle决定
        if dataloader.shuffle:
            np.random.shuffle(self.indexs)
        
        
    def __next__(self):
        # 如果到了一轮的边界，即迭代结束，抛出异常
        if self.batch_cursor >= self.num_batch_per_epoch:
            # 如果到了边界，抛出StopIteration
            raise StopIteration()
            
        # 职责：对数据进行封装
        #    b1  image.shape = 784,     label.shape = 1,     label_onehot.shape = 10,
        #    b2  image.shape = 784,     label.shape = 1,     label_onehot.shape = 10,
        #    b3  image.shape = 784,     label.shape = 1,     label_onehot.shape = 10,
        #    
        # images.shape = 3x784 = np.vstack()    labels.shape = 3x1,  one_hot.shape = 3x10
        # 循环batch_size次。这里不用考虑边界问题。因为使用的是策略2，向下取整
        # batch_data = [
        #   [image1, image2, image3, ..., imagen],
        #   [label1, label2, label3, ..., labeln],
        #   [one_hot1, one_hot2, one_hot3, ..., one_hotn]
        # ]
        
        batch_data = []
        for i in range(self.dataloader.batch_size):
            
            # 拿到图像的索引，这个索引可能是打乱过的
            # 10100个图，batch_size = 1000
            # 一轮需要多少个batch，10个batch
            # batch_cursor -> 0, 1, 2 ... 9
            # batch_cursor = 5
            # for i in range(batch_size):
            #     基于10100的索引 = batch_cursor * batch_size + i
            index = self.indexs[self.batch_cursor * self.dataloader.batch_size + i]
            
            # 从dataset中拿到数据，这个数据可能包含图像也可能包含标签
            # 这里只关心有几个数据，并不关心数据是什么样
            # 比如这里返回的是(image, label, label_onehot)
            data_item = self.dataloader.dataset[index]
            
            if len(batch_data) == 0:
                # 对batch_data做初始化
                batch_data = [[] for _ in data_item]
            
            # 把data_item中的每一项，分门别类的放到batch_data中
            for index, item in enumerate(data_item):
                batch_data[index].append(item)
                
        self.batch_cursor += 1
        
        # 当整个batch的数据准备好过后，可以用np.vstack拼接在一起
        for index in range(len(batch_data)):
            batch_data[index] = np.vstack(batch_data[index])
        return tuple(batch_data)
    

class DataLoader:
    # 职责
    # 实例化的时候需要指定dataset，batch_size，shuffle
    # 数据的封装，打包为一个batch
    # 对数据进行打乱
    # 可以通过迭代器来获取一批一批的数据
    #    好处是：使用一批创建一批
    def __init__(self, dataset, batch_size, shuffle=True):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
    
    def __iter__(self):
        # 实例化一个迭代器对象，将自身作为参数传入进去
        return DataLoaderIterator(self)
    
    def __len__(self):
        #用以告诉外界，多少次迭代，就算是完成一轮
        # 这里有2种处理方法
        # 1.向上取整
        # 2.整除，向下取整，多余部分丢弃
        # 这里考虑用2
        return len(self.dataset) // self.batch_size

# 计算的封装
# 对参数的封装
class Parameter:
    # 实例化的时候，传入参数值
    # 封装data、和grad，储存参数值和梯度
    def __init__(self, data):
        self.data = data
        self.grad = np.zeros_like(data)
        
    # 清空参数中储存的梯度
    def zero_grad(self):
        self.grad[...] = 0
    
# 对layer的封装
class Module:
    # 1.可以称之为算子，那么他应该有forward、backward。为了简化代码，可以用__call__实现forward
    # 2.需要实现以个params函数，拿出当前module下的所有【递归，如果有子类里面还有子类包含了参数，也要拿出来】参数实例
    # 3.考虑有些算子，需要感知当前的环境属于训练还是推理，用training储存是否为训练状态提供给特定算子用。通过
    # train方法和eval方法修改training的值
    
    def __init__(self):
        self.training = True
    
    def forward(self, *args):
        # forward输入参数可以是多个
        raise NotImplementedError()
        
    def backward(self, grad):
        # 假设算子输出只有一个，所以对应的梯度也应该只有一个
        raise NotImplementedError()
    
    def __call__(self, *args):
        # 用来简化xxx.forward()的调用，改成xxx()
        # 后期可以在__call__中，forward的前后加入一些代码，实现钩子编程范式
        return self.forward(*args)
    
    def modules(self):
        # 获取自身当前所有是Module的属性，并返回，这里没有递归。仅仅是自身
        ms = []
        for attr in self.__dict__:
            m = self.__dict__[attr]
            if isinstance(m, Module):
                ms.append(m)
        return ms
        
    def params(self):
        # 获取当前模块下的所有参数，注意是递归获取
        # 获取自身下的所有Parameter的属性
        ps = []
        for attr in self.__dict__:
            p = self.__dict__[attr]
            if isinstance(p, Parameter):
                ps.append(p)
                
        # 获取自身下的所有Module，然后调用Module的params获取到参数并合并到ps中
        # 有一个方法，获取所有当前类的module属性。这里不存在递归
        ms = self.modules()
        for m in ms:
            # 这里是递归在调用
            ps.extend(m.params())
        return ps
    
    def train(self):
        # 进入training模式，给到后续需要的算子必要信息
        self.training = True
        for m in self.modules():
            # 这里再递归调用
            m.train()
            
        # 返回self，是为了使用者可以链式调用
        # a.train()
        #    .eval()
        #    .to("cuda")
        return self
    
    def eval(self):
        # 进入评估模式，给到后续需要的算子必要信息
        self.training = False
        for m in self.modules():
            # 这里再递归调用
            m.eval()
        return self
    
# A-
#    - a Parameter
#    - b Parameter
#    - c Module
#       - c1 Parameter
#       - c2 Module
#       - 
#    - d Module
    
    
class Linear(Module):
    # 线性算子，线性层
    # 职责:
    #  包含了参数（parameter），包含了运算过程（forward、backward），对于输入的梯度计算，
    #  和对于参数（parameter）的梯度计算
    def __init__(self, num_input, num_output):
        super().__init__()
        
        # 凯明初始化
        # 1 / np.sqrt(num_input)
        self.weight = Parameter(np.random.normal(0, 1, size=(num_input, num_output)))
        self.bias   = Parameter(np.zeros((1, num_output)))
        
    def forward(self, x):
        # 保存x给到backward时使用
        self.x = x
        
        # 这里bias会发生广播，复制batch_size份bias进行加法操作
        return x @ self.weight.data + self.bias.data
    
    def backward(self, grad):
        # grad.shape = forward(x).shape = batch_size x num_output
        # bias.shape = 1 x num_output
        # AB = C, deltaC = G
        # deltaA = G @ B.T
        # deltaB = A.T @ G
        self.weight.grad += self.x.T @ grad
        self.bias.grad   += np.sum(grad, axis=0, keepdims=True)
        return grad @ self.weight.data.T

    
class Sigmoid(Module):
    # 输入x，输出y。forward做sigmoid，backward求导
    def forward(self, x):
        self.y = 1 / (1 + np.exp(-x))
        return self.y
    
    def backward(self, grad):
        return grad * self.y * (1 - self.y)
    

class Sequencial(Module):
    def __init__(self, *items):
        # 实现一堆模块的聚集，同时又能够被递归遍历到
        super().__init__()
        self.items = items
        
    def modules(self):
        # 覆盖基类的modules方法，直接返回items即可
        # 如果基类直接遍历__dict__属性，并判断类型是否为Module，此时items是tuple，不满足条件。所以得不到items
        # 所以要覆盖基类方法
        return self.items
    
    def forward(self, x):
        # 按照顺序执行items即可
        for m in self.items:
            x = m(x)
        return x
    
    def backward(self, grad):
        # 按照反向顺序，执行items中模块的backward
        for item in self.items[::-1]:
            grad = item.backward(grad)
        return grad
    

class SoftmaxCrossEntropyLoss(Module):
    def __init__(self):
        super().__init__()
        
    def forward(self, x, target):
        ex = np.exp(x)
        
        # 坐标系为1，是在列方向求和（输出的特征维度上求和）
        sumex = ex.sum(axis=1, keepdims=True)
        
        # sumex.shape = batch_size x 1
        # ex.shape   = batch_size x num_classes
        self.probability = ex / sumex
        self.batch_size = x.shape[0]
        self.target = target
        return -np.sum(self.target * np.log(self.probability)) / self.batch_size
        
    def backward(self, grad=1):
        return grad * (self.probability - self.target) / self.batch_size
    

# 对模型的封装
class Network(Module):
    def __init__(self, num_feature, num_hidden, num_classes):
        super().__init__()
        self.layers = Sequencial(
            Linear(num_feature, num_hidden),
            Sigmoid(),
            Linear(num_hidden, num_classes)
        )
        self.loss = SoftmaxCrossEntropyLoss()
        
    def inference(self, x):
        return self.layers(x)
        
    def forward(self, x, target):
        return self.loss(self.inference(x), target)
    
    def backward(self, grad=1):
        grad = self.loss.backward(grad)
        return self.layers.backward(grad)

    
# 对优化器的封装
# class Optimizer:
#     def __init__(self, param_groups):
#         # 给我所有的params，我给你做更新和应用
#         self.param_groups = param_groups
        
#     def zero_grad(self):
#         # 清空所有参数中的梯度
#         # 如果需要累积梯度，可以自行控制
#         for pg in self.param_groups:
#             # pg is dict   {"params": network1.params(), "lr": 1e-3},
#             for param in pg["params"]:
#                 param.zero_grad()
            
# Optimizer([
#     {"params": network1.params(), "lr": 1e-3},
#     {"params": network2.params(), "lr": 1e-2},
# ])
            
    
# 对优化器的封装
class Optimizer:
    def __init__(self, params, lr):
        # 给我所有的params，我给你做更新和应用
        self.params = params
        self.lr = lr
        
    def step(self):
        raise NotImplementedError()
        
    def zero_grad(self):
        # 清空所有参数中的梯度
        # 如果需要累积梯度，可以自行控制
        for param in self.params:
            param.zero_grad()
            
    def set_lr(self, lr):
        self.lr = lr
        
        
class SGD(Optimizer):
    def __init__(self, params, lr=1e-1):
        super().__init__(params, lr)
        
    def step(self):
        for param in self.params:
            param.data -= self.lr * param.grad

In [46]:
batch_size = 256
num_hidden = 100
num_feature = 784
num_epoch = 10

train_dataset = MNISTDataset("../05.06.bp_optimizer/dataset/train-images-idx3-ubyte", "../05.06.bp_optimizer/dataset/train-labels-idx1-ubyte")
train_loader  = DataLoader(train_dataset, batch_size, True)
test_dataset = MNISTDataset("../05.06.bp_optimizer/dataset/t10k-images-idx3-ubyte", "../05.06.bp_optimizer/dataset/t10k-labels-idx1-ubyte")
test_loader  = DataLoader(test_dataset, 512, True)

In [62]:
network = Network(num_feature, num_hidden, train_dataset.num_classes)
optim   = SGD(network.params(), 0.1)

for epoch in range(num_epoch):
    for images, labels, one_hots in train_loader:
        loss = network(images, one_hots)
        
        optim.zero_grad()
        network.backward()
        optim.step()
    print(f"{loss:.3f}")

1.633
1.077
1.035
0.819
0.701
0.534
0.634
0.737
0.563
0.433
