In [1]:
from datareader import *
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('device: ', device)

# 支持函数
def softmax(X):
    X_exp = torch.exp(X)
    partition = X_exp.sum(1, keepdim=True)
    return X_exp / partition

def cross_entrophy(yhat, y, label_smooth=0.1, weight_decay=0):
    # yhat是二维向量，第一个维度是batch， y是单维度的labels
    # 有待进一步测试
    real_y = torch.zeros(yhat.shape, device=device)
    real_y += label_smooth/yhat.shape[1]   # 应该能广播吧
    real_y[range(len(yhat)), y] += 1 - label_smooth
    loss = -torch.mul(real_y, torch.log(yhat))
    #除以batchsize
    return loss.sum(1, keepdim=True) / len(y)


def l2_penalty(w):
    # l2惩罚函数，把这个加入损失函数内就行了
    return torch.sum(w.pow(2))/2

def dropout_layer_fun(X, p):
    # dropout function 兼容 batch
    assert 0 <= p <= 1
    if p == 1:
        return torch.zeros_like(X, device=device)
    if p == 0:
        return X
    mask = (torch.rand(X.shape, device=device) > p).float()
    return mask * X / (1.0 - p)

#X = torch.arange(16, dtype=torch.float32).reshape(2, 8)
#print(X)
#print(dropout_layer_fun(X, 0.0))
#print(dropout_layer_fun(X, 0.2))
#print(dropout_layer_fun(X, 1.0))

device:  cuda:0
device:  cuda:0


In [2]:
# 更新参数
def sgd(params, lr):
    with torch.no_grad():
        for param in params:
            param -= lr * param.grad
            param.grad.zero_()



In [3]:
# 评估模型的支持函数
def accuracy(yhat, y):
    # 计算正确的***数量***
    if len(yhat.shape) > 1 and yhat.shape[1] > 1:
        yhat = yhat.argmax(axis=1)
    cmp = yhat.type(y.dtype) == y
    return float(cmp.type(y.dtype).sum())

def evaluate_accuracy(net, data_iter, lossfun):
    # 三个参数都是function
    with torch.no_grad():
        if isinstance(net, torch.nn.Module):
            net.eval()
        metric = [0.0, 0.0]
        total_loss = 0
        for X, y in data_iter:
            yhat = net(X)
            metric[0] += accuracy(yhat, y)
            total_loss += lossfun(yhat=yhat, y=y).sum()
            metric[1] += y.shape[0]
        # 正确率和平均损失
        return metric[0]/metric[1], total_loss/metric[1]


In [4]:
# 训练函数
def train_epoch_ch3(net, train_iter, loss, updater, dropout=None, dropout_p=0):
    if isinstance(net, torch.nn.Module):
        net.train()
    metric = [0.0, 0.0, 0.0]
    for X, y in train_iter:
        #print(X.shape)
        #print(y.shape)
        yhat = net(X)
        l = loss(yhat=yhat, y=y)
        if isinstance(updater, torch.optim.Optimizer):
            updater.zero_grad()
            l.backword()
            updater.step()
            metric[0] += accuracy(yhat, y)
            metric[1] += y.shape[0]
            metric[2] += float(l) * len(y)
        else:
            l.sum().backward()
            updater()
            metric[0] += accuracy(yhat, y)
            metric[1] += y.shape[0]
            metric[2] += float(l.sum())
    # 正确率和平均损失
    return metric[0]/metric[1], metric[2]/metric[1]


In [5]:
# 训练主模块

# hyperparameter
lr = 0.02
num_epoch = 75
batch_size = 400
dropout_p = 0.45

inputlayer = 784
hiddenlayer = 256
hiddenlayer1 = 256
hiddenlayer2 = 128
outputlayer = 10

# 初始化参数
# Omega1 = torch.normal(0, 0.01, (inputlayer, hiddenlayer), requires_grad=True, device=device)
# Theta1 = torch.zeros((1, hiddenlayer), requires_grad=True, device=device)
# Omega2 = torch.normal(0, 0.01, (hiddenlayer, outputlayer), requires_grad=True, device=device)
# Theta2 = torch.zeros((1, outputlayer), requires_grad=True, device=device)

#print(Omega2)
def myUpdater():
    return sgd([Omega1, Theta1, Omega2, Theta2], lr)

# 定义模型
def FC2Layer_model(x, dropout=None, p=0):
    x = torch.matmul(x, Omega1) - Theta1
    x = torch.nn.functional.relu(x)
    if dropout != None:
        x = dropout(x, p)
    x = torch.matmul(x, Omega2) - Theta2
    x = torch.nn.functional.sigmoid(x)
    return softmax(x) #用自己的softmax看看


class FC3Layer_model(torch.nn.Module):
    dropout1 = 0.25
    dropout2 = 0.5
    def __init__(self, inputlayer, hiddenlayer1, hiddenlayer2, outputlayer, is_training=True, dropoutF=None):
        super(FC3Layer_model, self).__init__()
        self.inputlayer = inputlayer
        self.outputlayer = outputlayer
        self.is_training = is_training
        self.dropout = dropoutF
        self.dropout_p = dropout_p
        self.lin1 = torch.nn.Linear(inputlayer, hiddenlayer1)
        self.lin2 = torch.nn.Linear(hiddenlayer1, hiddenlayer2)
        self.lin3 = torch.nn.Linear(hiddenlayer2, outputlayer)

    def forward(self, H1):
        H1 = torch.nn.functional.relu(self.lin1(H1.reshape(-1, self.inputlayer)))
        if self.is_training == True and not (self.dropout is None):
            H1 = self.dropout(H1, self.dropout1)
        H1 = torch.nn.functional.relu(self.lin2(H1))
        if self.is_training == True and not (self.dropout is None):
            H1 = self.dropout(H1, self.dropout2)
        H1 = torch.nn.functional.sigmoid(self.lin3(H1))
        return softmax(H1)

    def Updater(self):
        return sgd([c.bias for c in self.children()] + [c.weight for c in self.children()], lr)
        
    __call__ = forward   #兼容前面的类型

    def train(self):  #重写一下模块的码，怕出错
        self.is_training = True

    def eval(self):
        self.is_training = False


def UseXavier(m):
    if type(m) == torch.nn.Linear:
        torch.nn.init.xavier_normal_(m.weight)
        torch.nn.init.constant_(m.bias, 0)
            
   



# t = FC3Layer_model(inputlayer, hiddenlayer1, hiddenlayer2, outputlayer, True, dropout_layer_fun)
#print(*[i.weight for i in  t.children()])
# 数据导入

train_reader = MNISTReader(1)
test_reader = MNISTReader(2)



In [6]:
layer3 = FC3Layer_model(inputlayer, hiddenlayer1, hiddenlayer2, outputlayer, True, dropout_layer_fun)
layer3.apply(UseXavier)
layer3.to(device) #将模型复制到gpu
data_l = []
for epoch in range(num_epoch):
    #if epoch == 70:
    #   lr /= 1.8
    train_accu, train_loss = train_epoch_ch3(net=layer3,
                                             train_iter=train_reader.dataIter(batch_size),
                                             loss=cross_entrophy,
                                             updater=layer3.Updater,
                                             dropout=dropout_layer_fun, dropout_p=dropout_p)

    # 计算测试集
    test_accu, teat_loss = evaluate_accuracy(net=layer3, data_iter=test_reader.dataIter(batch_size), lossfun=cross_entrophy)
    data_l.append((train_accu, train_loss, test_accu, teat_loss))
    print(f'epoch: {epoch+1}; accuracy: train = {train_accu}, test = {test_accu}; loss: train = {train_loss}, test = {teat_loss}')


epoch: 1; accuracy: train = 0.28036666666666665, test = 0.6247; loss: train = 0.005369657146930695, test = 0.004947320558130741


KeyboardInterrupt: 

In [None]:

#绘图
import matplotlib.pyplot as plt

epoch_l = [i for i in range(1, num_epoch+1)]
y1 = [x[0] for x in data_l]
y2 = [x[2] for x in data_l]

y3 = [x[1] for x in data_l]
y4 = [float(x[3]) for x in data_l]

plt.figure(figsize=(18, 6), dpi=100)
plt.rc("font", family='MicroSoft YaHei', weight="bold")

plt.subplot(1, 2, 1)
plt.xlabel('epoch')
plt.ylabel('正确率')
plt.plot(epoch_l, y1, color='red', linestyle=':', label='train set')
plt.plot(epoch_l, y2, color='green', linestyle='-.', label='test set')

plt.subplot(1, 2, 2)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.plot(epoch_l, y3, color='red', linestyle=':', label='train set')
plt.plot(epoch_l, y4, color='green', linestyle='-.', label='test set')

plt.show()




In [None]:
# 统计分标签的正确率
if isinstance(layer3, torch.nn.Module):
    layer3.eval()
metric = [[0 for i in range(10)] for i in range(10)]  # 行表示正确的标签，列表示错误的标签
false_label = []
for X, y in test_reader.dataIter(1):
    yhat = layer3(X)
    yhat = yhat.argmax(axis=1)
    metric[int(y)][int(yhat)] += 1
# 正确率和平均损失
for i in range(10):
    print(f'数字{i}的正确率{round(metric[i][i]/sum(metric[i]) * 100, 2)}%', end='\t')
    metric[i][i] = 0
    j = metric[i].index(max(metric[i]))
    print(f'最大误报数字为{j}, 次数为{metric[i][j]}')
    #print(' '.join([f'数字{j}误报次数{metric[i][j]}; ' if (metric[i][j] > 0 and i != j) else '' for j in range(10)]))

In [None]:
# 展示效果
import matplotlib.pyplot as plt
from os import system
for imdata, label in test_reader.dataIter(1):
    system('cls')
    predict_re = layer3(imdata)
    imdata = imdata *128 + 128
    imdata = imdata.reshape((28, 28))
    fig = plt.figure()
    plt.imshow(imdata.cpu())
    plt.show()
    print(predict_re)
    predict_num = torch.argmax(predict_re)
    print(f'模型预测数字为: {predict_num}，正确答案为: {int(label)}，', end='')
    input('输入任意值继续: ')