# 此文档用于动手实现pytorch的一些基础功能
* 包括线性回归，sofatmax回归，MLP

## 1.线性回归的pytorch实现
* 不使用api的原始实现

In [66]:
import random
import torch as T
import matplotlib.pyplot as plt

# 数据生成器
def generate_data(w,b,num):
    X = T.normal(0,1,size=(num,len(w)))
    y = T.matmul(X,w) + b
    y += T.normal(0,0.01,size=(num,1))
    return X,y

# 数据输出器
def data_iter(batch_size,features,labels):
    num = len(features)
    index = [i for i in range(num)]
    random.shuffle(index) # 随机打乱数据
    for i in range(0,num,batch_size):
        batch_index = T.tensor(index[i:min(i + batch_size,num)])
        yield features[batch_index],labels[batch_index] #按batch_size输出

# 定义模型
def line(X,w,b):
    return T.matmul(X,w) + b

# 定义损失函数
def loss(y_hat,y):
    return ((y_hat-y)**2)/2

# 定义优化算法
def sgd (paras,lr,batch_size):
    with T.no_grad():
        for para in paras:
            para -= lr * para.grad/batch_size
            para.grad.zero_() # 用于清零当前参数的梯度，以便在下一次反向传播时重新计算

# 真实数据
true_w = T.tensor([[2],[-3.4]])
true_b = T.tensor(4.2)

features,labels = generate_data(true_w,true_b,1000)
# plt只能画数组型的数据，需要使用.numpy()转化
# plt.scatter(features[:,1].numpy(),labels.numpy(),1)

batch_size = 10 # batch大小
w = T.zeros(size=(2,1),requires_grad=True)
b = T.zeros(1,requires_grad=True)

# 训练
lr = 0.01
epoch = 5

for i in range(epoch):
    for X,y in data_iter(batch_size,features,labels):
        l = loss(line(X,w,b),y)
        l.sum().backward()
        sgd([w,b],lr,batch_size)
    with T.no_grad():
        tran_l = loss(line(features,w,b),labels)
        print("epoch:",i + 1,"\n","loss:",tran_l.mean().numpy())

print("w:",w,"\n","b:",b)

epoch: 1 
 loss: 2.1297317
epoch: 2 
 loss: 0.26636225
epoch: 3 
 loss: 0.03388089
epoch: 4 
 loss: 0.0044209873
epoch: 5 
 loss: 0.0006264636
w: tensor([[ 1.9841],
        [-3.3899]], requires_grad=True) 
 b: tensor([4.1712], requires_grad=True)


* 简洁实现版本

In [14]:
import numpy as np
import torch as T
from torch.utils import data
from torch import nn

# 数据生成器
def generate_data(w,b,num):
    X = T.normal(0,1,size=(num,len(w)))
    y = T.matmul(X,w) + b
    y += T.normal(0,0.01,size=(num,1))
    return X,y

# 读取数据集(这里使用高级api实现)
def load_array(features,labels,batch_size):
    data_set = data.TensorDataset(features,labels)
    return data.DataLoader(data_set,batch_size=batch_size,shuffle=True)

# 真实数据
true_w = T.tensor([[2],[-3.4]])
true_b = T.tensor(4.2)

# 数据生成器定义
features,labels = generate_data(true_w,true_b,1000)
data_iter = load_array(features,labels,10)

# 网络定义
net = nn.Sequential(nn.Linear(2,1))
# 参数初始化
net[0].weight.data.normal_(0,0.01)
net[0].bias.data.fill_(0)

# 定义损失函数
loss = nn.MSELoss()

# 定义优化算法
trainer = T.optim.SGD(net.parameters(),lr=0.03)

# 训练
epochs = 3
for epoch in range(epochs):
    for X,y in data_iter:
        l = loss(net(X),y)
        # 以下三步可以说是封装的非常简洁了
        trainer.zero_grad()
        l.backward()
        trainer.step()
    # with这句话非常关键，不然会Can't call numpy() on Tensor that requires grad.
    # with T.no_grad():
    tran_l = loss(net(features),labels)
    print(net[0].weight.data)
    print("epoch:",epoch + 1,"\n","loss:",tran_l.detach().numpy())
    


tensor([[ 2.0002, -3.3924]])
epoch: 1 
 loss: 0.00023145585
tensor([[ 2.0010, -3.4002]])
epoch: 2 
 loss: 0.00010563044
tensor([[ 2.0006, -3.4007]])
epoch: 3 
 loss: 0.000105609295


## 2.softmax回归的pytorch实现
* 原始实现
（交叉熵是度量两个概率分布差异的很好的度量，而平方差损失度量的是两个样本之间的差异）

In [50]:
import torch as T
import torchvision as tv
from torch.utils import data
from torchvision import transforms

# 定义数据生成器
def load_data(batch_size):
    # 将图像从 PIL 格式或 NumPy 数组转换为 PyTorch 的张量（tensor）
    trans = [transforms.ToTensor()]
    # 是一个用于将多个转换操作组合在一起的工具
    trans = transforms.Compose(trans)
    # 加载mnist数据集
    train_data = tv.datasets.MNIST(root='./data',train=True,download=True,transform=trans)
    test_data = tv.datasets.MNIST(root='./data',train=False,download=True,transform=trans)
    # 定义数据加载器
    train_iter = data.DataLoader(train_data,batch_size=batch_size,shuffle=True)
    test_iter = data.DataLoader(test_data,batch_size=batch_size,shuffle=False)
    return train_iter,test_iter

# 定义softmax函数
def softmax(X):
    X_exp = T.exp(X)
    partition = X_exp.sum(dim=1,keepdim=True)
    return X_exp / partition # 广播机制，很好用

 # 定义模型
def net(X):
    return softmax(T.matmul(X.reshape(-1,num_inputs),w) + b)

# 定义损失函数
def cross_entropy(y_hat,y):
    return -T.log(y_hat[T.arange(len(y_hat)),y])

# 定义优化算法
def sgd(params,lr,batch_size):
    with T.no_grad():
        for param in params:
            param -= lr * param.grad/batch_size
            param.grad.zero_()

# 超参数
epoch = 10
lr = 0.1
batch_size = 256
train_iter,test_iter = load_data(batch_size)
# 确定输入和输出
num_inputs = 784
num_outputs = 10
# 参数初始化
w = T.randn(num_inputs,num_outputs,requires_grad=True)
b = T.zeros(num_outputs,requires_grad=True)
# 进行训练
for i in range(epoch):
    for X,y in train_iter:
        y_hat = net(X)
        loss = cross_entropy(y_hat,y)
        loss.sum().backward()
        sgd([w,b],lr,batch_size)
    with T.no_grad():
        # 计算损失
        tran_l = cross_entropy(net(X),y)
        # 在测试集上计算分类正确率
        correct = 0
        total = 0
        for X, y in test_iter:
            y_hat = net(X)
            predicted = T.argmax(y_hat, dim=1)  # 获取预测标签
            correct += (predicted == y).sum().item()  # 统计正确的预测数量
            total += y.size(0)  # 统计总样本数量
        accuracy = correct / total
        print("epoch:",i + 1,"\n","loss:",tran_l.mean().numpy(),"\n",
              "test accuracy:", accuracy)

epoch: 1 
 loss: 2.3242693 
 test accuracy: 0.6406
epoch: 2 
 loss: 1.3307658 
 test accuracy: 0.7429
epoch: 3 
 loss: 1.0492113 
 test accuracy: 0.7832
epoch: 4 
 loss: 0.97587156 
 test accuracy: 0.8077
epoch: 5 
 loss: 1.168331 
 test accuracy: 0.821
epoch: 6 
 loss: 0.8169982 
 test accuracy: 0.8329
epoch: 7 
 loss: 0.94888455 
 test accuracy: 0.842
epoch: 8 
 loss: 0.5684145 
 test accuracy: 0.8463
epoch: 9 
 loss: 0.57723504 
 test accuracy: 0.8509
epoch: 10 
 loss: 0.5201943 
 test accuracy: 0.8552


* 索引关系，tensor和ndarray的索引关系是一致的,都可以使用tensor,ndarrdy,list和range型进行索引

In [39]:
import torch as T

a = np.arange(10)
b = T.arange(10)
c = [i for i in range(10)]
d = T.tensor([1,2])
e = np.array([1,2,3])
f = range(3)

print(b[f])
print(a[d])

tensor([0, 1, 2])
[1 2]


* 简洁实现版本

In [2]:
import torch as T
import torchvision as tv
import torch.nn as nn
import torch.nn.init as init

from torch.utils import data
from torchvision import transforms

# 定义数据生成器
def load_data(batch_size):
    # 将图像从 PIL 格式或 NumPy 数组转换为 PyTorch 的张量（tensor）
    trans = [transforms.ToTensor()]
    # 是一个用于将多个转换操作组合在一起的工具
    trans = transforms.Compose(trans)
    # 加载mnist数据集
    train_data = tv.datasets.MNIST(root='./data',train=True,download=True,transform=trans)
    test_data = tv.datasets.MNIST(root='./data',train=False,download=True,transform=trans)
    # 定义数据加载器
    train_iter = data.DataLoader(train_data,batch_size=batch_size,shuffle=True)
    test_iter = data.DataLoader(test_data,batch_size=batch_size,shuffle=False)
    return train_iter,test_iter

# 定义网络
net = nn.Sequential(nn.Flatten(),
                    nn.Linear(28*28,10))
# 初始化函数
def initialize_weights(model):
    for layer in model:
        if isinstance(layer, nn.Linear):
            init.kaiming_uniform_(layer.weight, nonlinearity='relu')  # 使用 He 初始化
            if layer.bias is not None:
                init.zeros_(layer.bias)  # 偏置初始化为 0
# 损失函数
loss = nn.CrossEntropyLoss()

# 调用初始化函数
initialize_weights(net)

# 优化器
sgd = T.optim.SGD(net.parameters(),lr=0.1)

# 训练
epoch = 10
batch_size = 256
train_iter,test_iter = load_data(batch_size)
for i in range(epoch):
    for X,y in train_iter:
        sgd.zero_grad()
        y_hat = net(X)
        l = loss(y_hat,y)
        l.backward()
        sgd.step()
    with T.no_grad():
        # 计算损失
        tran_l = loss(net(X),y)
        # 在测试集上计算分类正确率
        correct = 0
        total = 0
        for X, y in test_iter:
            y_hat = net(X)
            predicted = T.argmax(y_hat, dim=1)  # 获取预测标签
            correct += (predicted == y).sum().item()  # 统计正确的预测数量
            total += y.size(0)  # 统计总样本数量
        accuracy = correct / total
        print("epoch:",i + 1,"\n","loss:",tran_l.mean().numpy(),"\n",
              "test accuracy:", accuracy)

epoch: 1 
 loss: 0.40087435 
 test accuracy: 0.8838
epoch: 2 
 loss: 0.2801941 
 test accuracy: 0.8963
epoch: 3 
 loss: 0.36306193 
 test accuracy: 0.9052
epoch: 4 
 loss: 0.39751872 
 test accuracy: 0.9072
epoch: 5 
 loss: 0.31081054 
 test accuracy: 0.9109
epoch: 6 
 loss: 0.19952668 
 test accuracy: 0.9128
epoch: 7 
 loss: 0.3792193 
 test accuracy: 0.9159
epoch: 8 
 loss: 0.20895837 
 test accuracy: 0.9154
epoch: 9 
 loss: 0.27070248 
 test accuracy: 0.9163
epoch: 10 
 loss: 0.3738365 
 test accuracy: 0.9178


## 3.MLP的pytorch实现
* 这里直接调用api实现，可以看到在添加了隐藏层后的accuracy大大提升了

In [29]:
import torch as T
import torchvision as tv
import torch.nn as nn
import torch.nn.init as init

from torch.utils import data
from torchvision import transforms

# 定义数据生成器
def load_data(batch_size):
    # 将图像从 PIL 格式或 NumPy 数组转换为 PyTorch 的张量（tensor）
    trans = [transforms.ToTensor()]
    # 是一个用于将多个转换操作组合在一起的工具
    trans = transforms.Compose(trans)
    # 加载mnist数据集
    train_data = tv.datasets.MNIST(root='./data',train=True,download=True,transform=trans)
    test_data = tv.datasets.MNIST(root='./data',train=False,download=True,transform=trans)
    # 定义数据加载器
    train_iter = data.DataLoader(train_data,batch_size=batch_size,shuffle=True)
    test_iter = data.DataLoader(test_data,batch_size=batch_size,shuffle=False)
    return train_iter,test_iter

# 定义网络
net = nn.Sequential(nn.Flatten(),
                    nn.Linear(28*28,256),
                    nn.ReLU(),
                    nn.Linear(256,10)
                    )

# 初始化函数
def initialize_weights(model):
    for layer in model:
        if isinstance(layer, nn.Linear):
            init.kaiming_uniform_(layer.weight, nonlinearity='relu')  # 使用 He 初始化
            if layer.bias is not None:
                init.zeros_(layer.bias)  # 偏置初始化为 0
# 损失函数
loss = nn.CrossEntropyLoss()

# 调用初始化函数
initialize_weights(net)

# 优化器
sgd = T.optim.SGD(net.parameters(),lr=0.1)

# 训练
epoch = 10
batch_size = 256
train_iter,test_iter = load_data(batch_size)
for i in range(epoch):
    for X,y in train_iter:
        sgd.zero_grad()
        y_hat = net(X)
        l = loss(y_hat,y)
        l.backward()
        sgd.step()
    with T.no_grad():
        # 计算损失
        tran_l = loss(net(X),y)
        # 在测试集上计算分类正确率
        correct = 0
        total = 0
        for X, y in test_iter:
            y_hat = net(X)
            predicted = T.argmax(y_hat, dim=1)  # 获取预测标签
            correct += (predicted == y).sum().item()  # 统计正确的预测数量
            total += y.size(0)  # 统计总样本数量
        accuracy = correct / total
        print("epoch:",i + 1,"\n","loss:",tran_l.mean().numpy(),"\n",
              "test accuracy:", accuracy)

epoch: 1 
 loss: 0.32476017 
 test accuracy: 0.9032
epoch: 2 
 loss: 0.17184621 
 test accuracy: 0.9243
epoch: 3 
 loss: 0.18020672 
 test accuracy: 0.9329
epoch: 4 
 loss: 0.08596462 
 test accuracy: 0.939
epoch: 5 
 loss: 0.17072274 
 test accuracy: 0.9445
epoch: 6 
 loss: 0.18984102 
 test accuracy: 0.9454
epoch: 7 
 loss: 0.08353443 
 test accuracy: 0.9511
epoch: 8 
 loss: 0.09649221 
 test accuracy: 0.9521
epoch: 9 
 loss: 0.20344277 
 test accuracy: 0.9562
epoch: 10 
 loss: 0.21012758 
 test accuracy: 0.9605


* 暂退法（dropout）

In [None]:
import torch as T
from torch import nn

def dropout(X,drop):
    assert 0 <= drop <= 1
    if drop == 0:
        return X
    elif drop == 1:
        return T.zeros_like(X)
    else:
        mask = T.rand_like(X) > drop # 随机丢弃
        return mask * X / (1 - drop) # 保持期望值不变

X = T.arange(16).reshape((2, 8)).float()
print(X)
print(dropout(X, 0.5))


tensor([[ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.],
        [ 8.,  9., 10., 11., 12., 13., 14., 15.]])
tensor([[ 0.,  2.,  4.,  0.,  0.,  0.,  0.,  0.],
        [16., 18., 20.,  0., 24.,  0.,  0., 30.]])


* 自定义块以实现逻辑控制

In [None]:
import torch as T
from torch import nn
from torch.nn import functional as F

# 继承自nn.Module，只需要实例化forward即可使用
class MyBlock(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(10, 10)
    def forward(self, X):
        X = self.linear(X)
        X = F.relu(X)
        if X.abs().sum() > 100:
            return X * 0.5
        else:
            return X

* 参数管理和访问

In [None]:
import torch as T
from torch import nn

net = nn.Sequential(nn.Linear(4,8), nn.ReLU(),nn.Linear(8,1))
X = T.rand(2, 4)

print(net[2].state_dict())
print(net[2].weight.data)
print(net[2].bias.data)

OrderedDict([('weight', tensor([[ 0.3237, -0.1566, -0.1963, -0.0686, -0.2525,  0.3526, -0.3255,  0.1239]])), ('bias', tensor([-0.0416]))])
tensor([[ 0.3237, -0.1566, -0.1963, -0.0686, -0.2525,  0.3526, -0.3255,  0.1239]])
tensor([-0.0416])


* 参数初始化

In [None]:
def init_Xavier(model):
    for layer in model:
        if isinstance(layer, nn.Linear):
            nn.init.xavier_uniform_(layer.weight)
            nn.init.zeros_(layer.bias)

init_Xavier(net)
print(net[2].weight.data)
print(net[2].bias.data)

tensor([[ 0.0810,  0.6046, -0.3657,  0.6022,  0.0129, -0.1204,  0.1115, -0.6400]])
tensor([0.])


* GPU运算

In [None]:
import torch as T
from torch import nn

T.cuda.device_count()

1

* 使用GPU训练MLP

In [30]:
import torch as T
import torchvision as tv
import torch.nn as nn
import torch.nn.init as init

from torch.utils import data
from torchvision import transforms

# 检查是否有可用的 GPU
device = T.device('cuda' if T.cuda.is_available() else 'cpu')

# 定义数据生成器
def load_data(batch_size):
    # 将图像从 PIL 格式或 NumPy 数组转换为 PyTorch 的张量（tensor）
    trans = [transforms.ToTensor()]
    # 是一个用于将多个转换操作组合在一起的工具
    trans = transforms.Compose(trans)
    # 加载mnist数据集
    train_data = tv.datasets.MNIST(root='./data', train=True, download=True, transform=trans)
    test_data = tv.datasets.MNIST(root='./data', train=False, download=True, transform=trans)
    # 定义数据加载器
    train_iter = data.DataLoader(train_data, batch_size=batch_size, shuffle=True)
    test_iter = data.DataLoader(test_data, batch_size=batch_size, shuffle=False)
    return train_iter, test_iter

# 定义网络并移动到 GPU
net = nn.Sequential(nn.Flatten(),
                    nn.Linear(28 * 28, 256),
                    nn.ReLU(),
                    nn.Linear(256, 10)
                    ).to(device)  # 将模型移动到 GPU

# 初始化函数
def initialize_weights(model):
    for layer in model:
        if isinstance(layer, nn.Linear):
            init.kaiming_uniform_(layer.weight, nonlinearity='relu')  # 使用 He 初始化
            if layer.bias is not None:
                init.zeros_(layer.bias)  # 偏置初始化为 0

# 损失函数
loss = nn.CrossEntropyLoss().to(device)  # 将损失函数移动到 GPU

# 调用初始化函数
initialize_weights(net)

# 优化器
sgd = T.optim.SGD(net.parameters(), lr=0.1)

# 训练
epoch = 10
batch_size = 256
train_iter, test_iter = load_data(batch_size)
for i in range(epoch):
    for X, y in train_iter:
        X, y = X.to(device), y.to(device)  # 将数据移动到 GPU
        sgd.zero_grad()
        y_hat = net(X)
        l = loss(y_hat, y)
        l.backward()
        sgd.step()
    
    with T.no_grad():
        # 计算损失
        tran_l = loss(net(X), y)
        # 在测试集上计算分类正确率
        correct = 0
        total = 0
        for X, y in test_iter:
            X, y = X.to(device), y.to(device)  # 将数据移动到 GPU
            y_hat = net(X)
            predicted = T.argmax(y_hat, dim=1)  # 获取预测标签
            correct += (predicted == y).sum().item()  # 统计正确的预测数量
            total += y.size(0)  # 统计总样本数量
        accuracy = correct / total
        print("epoch:", i + 1, "\n", "loss:", tran_l.item(), "\n", "test accuracy:", accuracy)


epoch: 1 
 loss: 0.3116208612918854 
 test accuracy: 0.9121
epoch: 2 
 loss: 0.1257956475019455 
 test accuracy: 0.9236
epoch: 3 
 loss: 0.1763961762189865 
 test accuracy: 0.9335
epoch: 4 
 loss: 0.0909632071852684 
 test accuracy: 0.94
epoch: 5 
 loss: 0.1892259269952774 
 test accuracy: 0.9418
epoch: 6 
 loss: 0.17866913974285126 
 test accuracy: 0.9466
epoch: 7 
 loss: 0.11648309230804443 
 test accuracy: 0.9485
epoch: 8 
 loss: 0.0661228820681572 
 test accuracy: 0.9538
epoch: 9 
 loss: 0.11011847853660583 
 test accuracy: 0.9556
epoch: 10 
 loss: 0.09546028822660446 
 test accuracy: 0.9587


>由上述结果可以看出运行结果确实快了不少
>需要将网络，数据，标签都移动到GPU上
```python
# 定义设备
device = T.device('cuda' if T.cuda.is_available() else 'cpu')
# 移动网络到GPU
net.to(device)
# 移动数据到GPU
X = X.to(device)
y = y.to(device)
```