In [1]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
import torch.nn.functional as F

import torchvision.datasets as dsets
import torchvision.transforms as transforms

import matplotlib.pyplot as plt
import numpy as np

%matplotlib inline

In [2]:
# Hyper parameters
image_size = 28  #图像的总尺寸28*28
num_classes = 10  #标签的种类数
num_epochs = 100  #训练的总循环周期
batch_size = 64  #一个撮（批次）的大小，64张图片

#加载MINST数据，如果没有下载过，就会在当前路径下新建/data1子目录，并把文件存放其中
train_dataset = dsets.MNIST(root='./data1',
                            train=True,
                            transform=transforms.ToTensor(),
                            download=True)
# 加载测试数据集
test_dataset = dsets.MNIST(root='./data1',
                           train=False,
                           transform=transforms.ToTensor(),
                           download=True)

# 分割数据集
A = [0, 1, 2, 3, 4]
num_images = 60000
train_A = []
train_B = []
for i in range(num_images):
    if train_dataset[i][1] in A:
        train_A.append(train_dataset[i])
    else:
        train_B.append(train_dataset[i])
print(len(train_A),len(train_B))

30596 29404


In [3]:
# 训练数据集的加载器，按不同训练方式共三个
train_loaderA = torch.utils.data.DataLoader(dataset=train_A,
                                            batch_size=batch_size,
                                            shuffle=True) 

train_loaderB = torch.utils.data.DataLoader(dataset=train_B,
                                            batch_size=batch_size,
                                            shuffle=True)

train_loader  = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size=batch_size,
                                            shuffle=True)

In [4]:
# 校验集和测试集
indices = range(len(test_dataset))
indices_val = indices[:5000]
indices_test = indices[5000:]

# 采样器
sampler_val = torch.utils.data.sampler.SubsetRandomSampler(indices_val)
sampler_test = torch.utils.data.sampler.SubsetRandomSampler(indices_test)

# 加载器
validation_loader = torch.utils.data.DataLoader(dataset = test_dataset,
                                                batch_size = batch_size,
                                                shuffle = False,
                                                sampler = sampler_val
                                               )

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False,
                                          sampler=sampler_test)
                   

In [5]:
#定义卷积神经网络：4和8为人为指定的两个卷积层的厚度（feature map的数量）
depth = [4, 8]
class ConvNet(nn.Module):
    def __init__(self):
        # 该函数在创建一个ConvNet对象的时候，即调用如下语句：net=ConvNet()，就会被调用
        # 首先调用父类相应的构造函数
        super(ConvNet, self).__init__()
        
        # 其次构造ConvNet需要用到的各个神经模块。
        self.conv1 = nn.Conv2d(1, 4, 5, padding = 2) #定义一个卷积层，输入通道为1，输出通道为4，窗口大小为5，padding为2
        self.pool = nn.MaxPool2d(2, 2) #定义一个Pooling层，一个窗口为2*2的pooling运算
        self.conv2 = nn.Conv2d(depth[0], depth[1], 5, padding = 2) #第二层卷积，输入通道为depth[0], 
                                                                   #输出通道为depth[1]，窗口为5，padding为2
        self.fc1 = nn.Linear(image_size // 4 * image_size // 4 * depth[1] , 512) 
                                                            #一个线性连接层，输入尺寸为最后一层立方体的平铺，输出层512个节点
        self.fc2 = nn.Linear(512, num_classes) #最后一层线性分类单元，输入为512，输出为要做分类的类别数

    def forward(self, x):
        #该函数完成神经网络真正的前向运算，我们会在这里把各个组件进行实际的拼装
        x = F.relu(self.conv1(x))  #第一层卷积，激活函数用ReLu，为了防止过拟合
        x = self.pool(x) #第二层pooling，将图片变小
        x = F.relu(self.conv2(x)) #第三层又是卷积，窗口为5，输入输出通道分别为depth[0]=4, depth[1]=8
        x = self.pool(x) #第四层pooling，将图片缩小到原大小的1/4
        
        #将立体的特征图压成一个一维向量
        x = x.view(-1, image_size//4 * image_size //4 * depth[1])
        
        x = F.relu(self.fc1(x))
        #为防止过拟合，以默认为0.5的概率对这一层进行dropout操作
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        x = F.log_softmax(x)
        return x
                
     

In [6]:
# 计算准确率的函数
def rightness(predictions, labels):
    predictions = Variable(predictions)
    pred = torch.max(predictions.data, 1)[1]
    rights = pred.eq(labels.data.view_as(pred)).sum()
    return rights, len(labels)

In [7]:
results = {}  #试验结果
times = 10 
fractions = [20, 10, 8, 6, 5, 4, 3, 2, 1] #数据加载比例值

num_epoch1 = 50
num_epoch2 = 100

for experiment in ['0~4/5~9','0~4/0~9', '0~9']:  
    for time in range(times):
        for fraction in fractions:
            
            
            if experiment == '0~4/5~9':
                net = ConvNet()
                criterion = nn.CrossEntropyLoss()
                optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
                trainData = train_loaderA
            if experiment =='0~4/0~9':
                trainData = train_loaderA
                net = ConvNet()
                criterion = nn.CrossEntropyLoss()
                optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
            if experiment == '0~9':
                trainData = train_loader
                net = ConvNet()
                criterion = nn.CrossEntropyLoss()
                optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
                
            # 开始训练
            records = []
            for epoch in range(num_epoch1):
                losses = []
                for idx, (data, label) in enumerate(trainData):
                    if idx >= (len(trainData) // fraction):
                        break
                    data, label = Variable(data), Variable(label)
                    net.train()
                    optimizer.zero_grad() #清空梯度
                    
                    output = net(data)
                    loss = criterion(output,label)
                    loss.backward()
                    optimizer.step()
                    losses.append(loss.data.numpy())
                    
                    if idx % 200 == 0:
                        val_losses = []
                        rights =[]
                        net.eval()
                        for val_data in validation_loader:
                            (data, label) = val_data
                            data, label = Variable(data), Variable(label)
                            output = net(data)
                            loss = criterion(output, label)
                            val_losses.append(loss.data.numpy())
                            right = rightness(output.data, label)
                            rights.append(right)
                        right_ratio = 1.0 * np.sum([i[0] for i in rights]) / sum([i[1] for i in rights])
                        records.append([np.mean(losses), np.mean(val_losses), right_ratio])
                        print('{}网络：第1阶段,第{}次试验，数据比例{}，第{}周期，第({}/{})个撮，训练误差:{:.2f},校验误差：{:.2f}, 准确率:{:.2f}'
                              .format(experiment, time, fraction, epoch, idx, len(trainData),
                              np.mean(losses), np.mean(val_losses), right_ratio))
                
            for epoch in range(num_epoch1, num_epoch2):
                if experiment == '0~4/5~9':
                        trainData = train_loaderB
                if experiment == '0~4/0~9':
                        trainData = train_loader
                if experiment == '0~9':
                        trainData = train_loader
                    
                losses = []
                for idx, (data, label) in enumerate(trainData):
                    if idx >= (len(trainData) // fraction):
                        break  
                    
                    net.train()
                    optimizer.zero_grad() #清空梯度
                    data, label = Variable(data), Variable(label)
                    output = net(data)
                    loss = criterion(output,label)
                    loss.backward()
                    optimizer.step()
                    losses.append(loss.data.numpy())
                    
                    if idx % 200 == 0:
                        val_losses = []
                        rights =[]
                        net.eval()
                        for val_data in validation_loader:
                            (data, label) = val_data
                            data, label = Variable(data), Variable(label)
                            output = net(data)
                            loss = criterion(output, label)
                            val_losses.append(loss.data.numpy())
                            right = rightness(output.data, label)
                            rights.append(right)
                        right_ratio = 1.0 * np.sum([i[0] for i in rights]) / sum([i[1] for i in rights])
                        records.append([np.mean(losses), np.mean(val_losses), right_ratio])
                        print('{}网络：第2阶段,第{}次试验，数据比例{}，第{}周期，第({}/{})个撮，训练误差:{:.2f},校验误差：{:.2f}, 准确率:{:.2f}'
                              .format(experiment, time, fraction, epoch, idx, len(trainData),
                              np.mean(losses), np.mean(val_losses), right_ratio))
                                 
            test_rights = []
            net.eval()
            for test_data in test_loader:
                (data, label) = test_data
                data, label = Variable(data), Variable(label)
                output = net(data)
                loss = criterion(output, label)
                right = rightness(output.data, label)
                test_rights.append(right)
            right_ratio = 1.0 * np.sum([i[0] for i in test_rights]) / np.sum([i[1] for i in test_rights])
            print(experiment, time, fraction)
                    
            results[(experiment, time, fraction)] = [records, right_ratio]
                            



0~4/5~9网络：第1阶段,第0次试验，数据比例20，第0周期，第(0/479)个撮，训练误差:2.30,校验误差：2.31, 准确率:0.12
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第1周期，第(0/479)个撮，训练误差:2.26,校验误差：2.30, 准确率:0.12
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第2周期，第(0/479)个撮，训练误差:2.21,校验误差：2.31, 准确率:0.15
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第3周期，第(0/479)个撮，训练误差:2.11,校验误差：2.31, 准确率:0.20
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第4周期，第(0/479)个撮，训练误差:1.95,校验误差：2.38, 准确率:0.15
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第5周期，第(0/479)个撮，训练误差:1.63,校验误差：2.74, 准确率:0.24
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第6周期，第(0/479)个撮，训练误差:1.58,校验误差：3.38, 准确率:0.30
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第7周期，第(0/479)个撮，训练误差:1.38,校验误差：3.84, 准确率:0.42
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第8周期，第(0/479)个撮，训练误差:1.10,校验误差：4.18, 准确率:0.40
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第9周期，第(0/479)个撮，训练误差:0.79,校验误差：4.61, 准确率:0.41
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第10周期，第(0/479)个撮，训练误差:0.59,校验误差：4.96, 准确率:0.43
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第11周期，第(0/479)个撮，训练误差:0.52,校验误差：5.30, 准确率:0.44
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第12周期，第(0/479)个撮，训练误差:0.48,校验误差：5.45, 准确率:0.47
0~4/5~9网络：第1阶段,第0次试验，数据比例20，第13周期，第

KeyboardInterrupt: 

In [None]:
one_curve = {}
tests = {}
for experiment in ['0~4/5~9','0~4/0~9', '0~9']:
    for fraction in fractions:
        one_experiment = []
        test_value = []
        for time in range(times):
            rr = results[(experiment, time, fraction)]
            one_experiment.append([ii[2] for ii in rr[0]])
            test_value.append(rr[1])
        aa = np.array(one_experiment)
        #print(aa.shape)
        one_curve1[(experiment, fraction)] = np.mean(aa, 0)
        tests1[(experiment, fraction)] = np.mean(test_value)

In [None]:
# 然后再绘制它们的误差曲线
for fraction in fractions:
    plt.figure(figsize = (10, 7))
    plt.title('{:.2f} % of the data'.format(100.0 / fraction))
    plt.plot(1 - one_curve1[('0~4/5~9', fraction)], label = '0~9')
    plt.plot(1 - one_curve1[('0~4/0~9', fraction)], label = '0~4/0~9')
    plt.plot(1 - one_curve1[('0~9', fraction)], label = '0~9')
    plt.legend()
    plt.xlabel('Time')
    plt.ylabel('Error')

In [None]:
# 绘制测试准确度随着fraction变化的曲线
plt.figure(figsize = (10, 7))
for experiment in ['0~4/5~9']:
    testss = []
    for fraction in fractions:
        test = 1 - tests[(experiment, fraction)]
        testss.append(test)
    plt.plot(fractions, testss, 'o-', label = experiment)
plt.legend()
plt.xlabel('Fractions')
plt.ylabel('Error Rate')

In [None]:
# 绘制测试准确度随着fraction变化的曲线
plt.figure(figsize = (10, 7))
for experiment in ['0~4/5~9','0~4/0~9', '0~9']:
    testss = []
    for fraction in fractions:
        test = 1 - tests1[(experiment, fraction)]
        testss.append(test)
    plt.plot(fractions, testss, 'o-', label = experiment)
plt.legend()
plt.xlabel('Fractions')
plt.ylabel('Error Rate')