In [2]:
from matplotlib import pyplot as plt
import torch
from torch.utils.data import DataLoader
from torch.utils.data import TensorDataset
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from sklearn.datasets import load_iris
import warnings
from typing import Union
import time
import seaborn
from torch import nn
from typing import Optional
from torchvision import transforms

### 导入数据集

In [13]:
from torchvision import datasets

train_data = datasets.MNIST(root="./DataSets/",train=True, transform=transforms.ToTensor(),download=False)
train_loader = DataLoader(train_data,batch_size=64, shuffle=True)
test_data = datasets.MNIST(root="./DataSets/",train=False, transform=transforms.ToTensor(),download=False)
test_loader = DataLoader(test_data)


### 创建一个简单的CNN网络

#### 使用tenor手动实现

In [62]:
class MyConv2d(object):
    def __init__(self,
        in_channels:int, 
        out_channels:int,
        kernel_size:torch.Size=(3,3),
        padding:Union[int,Optional[torch.Size],None]=1,
        stride:Union[int,Optional[torch.Size]]=1) -> None:
        
        self.in_channels = in_channels # 输入通道数
        self.out_channels = out_channels # 输出通道数
        self.padding = padding # 填充size
        self.stride = stride # 步长
        self.kernel_size = kernel_size # 卷积核size
        # 权重在卷积层中即为卷积核
        self.weight = torch.rand((self.out_channels, self.in_channels, self.kernel_size[0], self.kernel_size[1]))
        self.bias = torch.zeros(1)
        
        
    def zero_padding(self,X:torch.Tensor)->torch.Tensor:
        """零填充函数

        Args:
            X (torch.Tensor): (height, width)

        Returns:
            torch.Tensor: (height+2, width+2)
        """        
        # 不作padding处理,将数据直接返回
        if self.padding is None or self.padding==0:
            return X
        if type(self.padding) is int:
            expand_h = 2 * self.padding
            expand_w = expand_h
            # 初始化一个目标形状的tensor来存放填充操作的新tensor,返回值不会修改原数据,但是可以抛弃原数据的引用
            temp = torch.zeros(X.shape[0], X.shape[1], X.shape[2] + expand_h, X.shape[3] + expand_w)
            for i in range(X.shape[0]):
                for j in range(X.shape[1]):
                    temp[i, j, self.padding : -self.padding, self.padding : -self.padding] = X[i, j, :, :]
        else:
            # 如果没有padding,将数据直接返回
            if tuple(self.padding)==(0,0):
                return X
            expand_h = 2 * self.padding[0]
            expand_w = 2 * self.padding[1]
            # 初始化一个目标形状的tensor来存放填充操作的新tensor,返回值不会修改原数据,但是可以抛弃原数据的引用
            temp = torch.zeros(X.shape[0], X.shape[1], X.shape[2] + expand_h, X.shape[3] + expand_w)
            if self.padding[0]==0:
                #padding(0, w)
                for i in range(X.shape[0]):
                    for j in range(X.shape[1]):
                        temp[i, j, : , self.padding[1] : -self.padding[1]] = X[i, j, :, :]
            else:
                # padding(h, 0)
                if self.padding[1]==0:
                    for i in range(X.shape[0]):
                        for j in range(X.shape[1]):
                            temp[i, j, self.padding[0] : -self.padding[0], : ] = X[i, j, :, :]
                
                else:
                    # padding(h, w)都不为0
                    for i in range(X.shape[0]):
                        for j in range(X.shape[1]):
                            temp[i, j, self.padding[0] : -self.padding[0], self.padding[1] : -self.padding[1]] = X[i, j, :, :]
        
        return temp

    def corr2d(self, X:torch.Tensor, kernel:torch.Tensor)->torch.Tensor:
        """单通道单个卷积核卷积函数

        Args:
            X (torch.Tensor): 数据的某个通道
            kernel (torch.Tensor): 二维卷积核

        Returns:
            torch.Tensor: 卷积结果矩阵
        """ 
        
        h, w = kernel.shape
        # 利用公式计算目标结果的形状开始给对应位置卷积计算赋值
        if type(self.stride) is int:
            Y = torch.zeros(int((X.shape[0]-h)/self.stride) + 1, int((X.shape[1]-w)/self.stride) + 1)
            for i in range(0,Y.shape[0]):
                for j in range(Y.shape[1]):
                    Y[i,j] = (X[i + self.stride-1 : i + self.stride-1 + h, j + self.stride-1 : j + self.stride-1 + w] * kernel).sum()
        else:
            Y = torch.zeros(int((X.shape[0]-h)/self.stride[0]) + 1, int((X.shape[1]-w)/self.stride[1]) + 1)
            for i in range(0,Y.shape[0]):
                for j in range(Y.shape[1]):
                    Y[i,j] = (X[i + self.stride[0]-1 : i + self.stride[0]-1 + h, j + self.stride[1]-1 : j + self.stride[1]-1 + w] * kernel).sum()
            
        return Y

    def corr2d_multi_in(self, X:torch.Tensor, kernels:torch.Tensor)->torch.Tensor:
        """多通道输入卷积计算函数

        Args:
            X (torch.Tensor): 具有多个通道的数据
            Kernel (torch.Tensor): 卷积核的tensor集合,分别对应每个通道

        Returns:
            torch.Tensor: 多个通道卷积结果的累加
        """
        return sum(self.corr2d(x, k) for x,k in zip(X, kernels))

    def corr2d_multi_in_out(self,X:torch.Tensor, Kernels: torch.Tensor)->torch.Tensor:
        return torch.stack([self.corr2d_multi_in(X,kernels) for kernels in Kernels], dim=0)

    def forward(self, X:torch.Tensor)->torch.Tensor:
        """前向传播

        Args:
            X (torch.Tensor): with size (batch_size, channel,height,width)

        Returns:
            torch.Tensor: with size (batch_size, out_channels, res_h, res_w)
        """        
        # 对数据进行padding处理
        X = self.zero_padding(X)
        
        res = [self.corr2d_multi_in_out(x, self.weight) + self.bias for x in X]
        # 将tensor list 转换成tensor的同时keepdim,若采用cat将丢失维度
        return torch.stack(res, dim=0)

class MyMaxPooling2d(object):
    def __init__(self, kernel_size:Optional[list]=(2,2), stride:Union[int,Optional[list]]=2) -> None:
        # 仍然有padding和stride的参数选项,但此处不在增加
        self.kernel_size = kernel_size
        self.stride = stride

    def forward(self, X:torch.Tensor)->torch.Tensor:
        """前向传播

        Args:
            X (torch.Tensor): with size (batch_size, channel, height, width)

        Returns:
            torch.Tensor: with the same dimension of X
        """
        
        h, w = self.kernel_size
        if type(self.stride) is int:
            Y = torch.zeros(X.shape[0],X.shape[1], int((X.shape[2]-h)/self.stride) + 1, int((X.shape[3]-w)/self.stride) + 1)
            for i in range(Y.shape[0]):
                for j in range(Y.shape[1]):
                    for k in range(Y.shape[2]):
                        for m in range(Y.shape[3]):
                            Y[i,j,k,m] = X[i,j, k:k+h,m:m+w].max()
        else:
            Y = torch.zeros(X.shape[0],X.shape[1], int((X.shape[2]-h)/self.stride[0]) + 1, int((X.shape[3]-w)/self.stride[1]) + 1)
            for i in range(Y.shape[0]):
                for j in range(Y.shape[1]):
                    for k in range(Y.shape[2]):
                        for m in range(Y.shape[3]):
                            Y[i,j,k,m] = X[i,j, k+self.stride[0]-1:k+h+self.stride[0]-1,m+self.stride[1]-1:m+w+self.stride[1]-1].max()
        return Y

class MyCNN(object):
    def __init__(self, in_channels:int, n_classes:int) -> None:
        self.l0 = MyConv2d(in_channels=in_channels, out_channels=4, kernel_size=(3,3))
        self.act = nn.ReLU()
        self.l1 = MyMaxPooling2d(kernel_size=(2,2),stride=(2,2))
        self.l2 = MyConv2d(in_channels=4,out_channels=6,kernel_size=(3,3))
        self.l3 = nn.Linear(6*14*14,out_features=n_classes)
    def forward(self,X:torch.Tensor)->torch.Tensor:
        x = self.l0.forward(X)
        x = self.act(x)
        x = self.l1.forward(x)
        x = self.l2.forward(x)
        x = self.act(x)
        # 拉直
        x = x.reshape(x.shape[0], -1)
        
        x = self.l3(x)
        return x
    
    def loss(self, y:torch.Tensor, y_hat:torch.Tensor)->torch.Tensor:
        fn = nn.functional.cross_entropy(y,y_hat)

    def configure_optimizer(self, lr:float)->Optional[torch.optim.SGD]:
        pass

    def fit(self, loader:DataLoader, epoches=5, lr=0.001):

        optimizer = self.configure_optimizer(lr)
        for epoch in range(epoches):
            for X, y in loader:
                optimizer.zero_grad()
                l = self.loss(y, self.forward(X))
                l.backward()
                optimizer.step()


In [40]:
X = torch.rand((64,1,28,28))
test = MyConv2d(in_channels=1, out_channels=10 , kernel_size=(3,3), padding=(1,1), stride=(1,1))
temp = test.forward(X)

In [63]:
# 测试前向传播成功,调用自动微分实现反向传播,that should work
# 速度很慢都是for写的,也许用c++写好点
test1 = MyCNN(in_channels=1,n_classes=10)
test1.forward(X).shape

torch.Size([64, 10])

#### 使用Pytorch高级API实现

In [10]:
class CNN(nn.Module):
    def __init__(self, in_channels:int, n_classes:int) -> None:
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_channels=in_channels,out_channels=8,kernel_size=(3,3),stride=(1,1),padding=(1,1)),nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2), stride=(2,2)),
            nn.Conv2d(in_channels=8,out_channels=16, kernel_size=(3,3),stride=(1,1),padding=(1,1)),
            nn.Flatten(),
            nn.Linear(16*14*14,out_features=n_classes)
        )
    def forward(self,X:torch.Tensor)->torch.Tensor:
        return self.net(X)
    
    def configure_optimizer(self,lr):
        return torch.optim.Adam(self.net.parameters(), lr)
    def loss(self, y, y_hat):
        
        return nn.functional.cross_entropy(y_hat, y)

    def fit(self,device, loader:DataLoader, lr=0.001, epochs=10,):
        optimizer = self.configure_optimizer(lr)
        for epoch in range(epochs):
            for X, y in loader:
                # Send the data to cuda to calculate
                X = X.to(device)
                y = y.to(device)
                optimizer.zero_grad()
                l = self.loss(y, self.net(X))
                l.backward()
                optimizer.step()


In [11]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = CNN(1, 10).to(device=device)

model.fit(device, train_loader)



In [12]:
torch.max(torch.rand((3,9)), dim=1)

torch.return_types.max(
values=tensor([0.9801, 0.9700, 0.9438]),
indices=tensor([3, 7, 2]))

In [17]:
# 测试集正确率
count=0
for X,y in test_loader:
    X = X.to(device)
    y=  y.to(device)
    _, prediction = torch.max(model(X), dim=1)
    if prediction==y:
        count+=1
print("accuracy: ", count/len(test_loader))

accuracy:  0.9831


#### 建立Batch Normalization Layer

In [13]:
def try_gpu():
    return 'cuda' if torch.cuda.is_available() else 'cpu'
class MyBatchNorm(nn.Module):
    def __init__(self, 
                gamma:Union[float, torch.Tensor]=None, 
                beta:Union[float,torch.Tensor]=None,
                moving_avg:torch.Tensor=None,
                moving_var:torch.Tensor=None,
                eps:float=1e-5) -> None:
        super(MyBatchNorm, self).__init__()
        self.gamma = gamma
        self.beta = beta
        self.eps = eps
        self.moving_avg = moving_avg
        self.moving_var = moving_var
        
        
        
    def batch_norm(self, X:torch.Tensor, moving_avg:torch.Tensor, moving_var:torch.Tensor, monetum:float, eps:float, mode:str='conv'):
        # 如果在训练模式那么需要接收上一轮训练的均值和方差偏移
        if not torch.is_grad_enabled():
            # 预测模式使用偏移均值和方差来规范化
            X_hat = (X - moving_avg)/torch.sqrt(moving_var + eps)
            
        else:
            
            # 前接卷积运算
            if mode == 'conv':
                # 计算通道维上的均值、方差
                avg = X.mean(dim=(0,2,3), keepdim=True)
                var = ((X-avg)**2).mean(dim=(0,2,3), keepdim=True)
            else:
                avg = X.mean(dim=0)
                var = ((X - avg)**2).mean(dim=0)
        
            X_hat =(X-avg)/torch.sqrt(var + eps)
            # 设定均值和方差偏移(用于测试集的规范化,模拟训练集和测试集的分布差异)
            moving_avg = moving_avg * monetum + (1-monetum) * avg
            moving_var = moving_var * monetum + (1-monetum) * var
        Y = self.gamma * X_hat + self.beta
        return Y, moving_avg, moving_var



    def forward(self,X:torch.Tensor)->torch.Tensor:
        """forward

        Args:
            X (torch.Tensor):要做batchnormalization的数据:(example, chinnel, height, width)型和(example, n_features)型

        Returns:
            torch.Tensor
        """        
        if self.gamma is None:
            self.gamma = torch.rand(X.shape)
        if self.beta is None:
            self.beta = torch.rand(X.shape)
        # 检查输入数据的维度(确认前接卷积层还是全连接层)
        assert len(X.shape) in (2,4)
        n_features = X.shape[1]
        if X.dim()==4:
            mode = 'conv'
            if self.moving_avg is None:
                self.moving_avg = torch.zeros(1,n_features, 1, 1)
            if self.moving_var is None:
                self.moving_var = torch.ones(1, n_features, 1, 1)
        else:
            if self.moving_avg is None:
                self.moving_avg = torch.zeros(1,n_features)
            if self.moving_var is None:
                self.moving_var = torch.ones(1, n_features)
            mode='linear'
        # 模型加载到显存上训练将数据全部移至显存上计算
        if X.device != self.moving_avg.device:
            X = X.to(try_gpu())
        Y, self.moving_avg, self.moving_var = self.batch_norm(X, monetum=0.9, moving_avg=self.moving_avg, 
                                                            moving_var=self.moving_var,eps=self.eps, mode=mode)
        return Y


model = MyBatchNorm().to(try_gpu())
model(torch.rand(64,3,28,28)).shape

torch.Size([64, 3, 28, 28])

##### 在LeNet网络中添加自己的BacthNormalization层

In [15]:
net = nn.Sequential(
    nn.Conv2d(1, 6, kernel_size=5), MyBatchNorm(), nn.Sigmoid(),
    nn.AvgPool2d(kernel_size=2, stride=2),
    nn.Conv2d(6, 16, kernel_size=5), MyBatchNorm(), nn.Sigmoid(),
    nn.AvgPool2d(kernel_size=2, stride=2), nn.Flatten(),
    nn.Linear(16*4*4, 120), MyBatchNorm(), nn.Sigmoid(),
    nn.Linear(120, 84), MyBatchNorm(), nn.Sigmoid(),
    nn.Linear(84, 10))
net(torch.rand(1,1,28,28)).shape

torch.Size([1, 10])