In [1]:
import torch
import numpy as np
import os
from torch import optim
import torch.nn.functional as F
import random
from PIL import Image

# 数据预处理
root_dir = 'D:\\数据库\\小样本数据库\\data\\omniglot2\\raw'

import torchvision.transforms as transforms
from PIL import Image

# 生成数据集的x，y

## 1.把每个样本存为三元组格式

In [2]:
#返回一个img_items列表，包含所有图片样本，每个样本都是一个三元组（图片名词，语言/字母类别，图片样本所在的目录）
def find_classes(root_dir):
    img_items = []
    for (root, dirs, files) in os.walk(root_dir):
        for file in files:  #遍历某种语言某类字母的20张图片
            if (file.endswith("png")):
                r = root.split('\\')
                img_items.append((file, r[-2] + "/" + r[-1], root))
    #print("== Found %d items " % len(img_items))   #32460个样本
    return img_items

img_items = find_classes(root_dir)
print(img_items[0])
# print(img_items[0])  
# ('0709_01.png', 
#  'Alphabet_of_the_Magi/character01', 
#  'D:/数据库/小样本数据库/data/omniglot2/raw\\images_background\\Alphabet_of_the_Magi\\character01')

('0709_01.png', 'Alphabet_of_the_Magi/character01', 'D:\\数据库\\小样本数据库\\data\\omniglot2\\raw\\images_background\\Alphabet_of_the_Magi\\character01')


## 2.构造类别映射字典

In [3]:
## 构建一个词典{class:idx}  把所有类（具体到所有语言的所有字母类别）构成一个映射，索引按顺序排列
def index_classes(items):
    class_idx = {}
    count = 0
    for item in items:
        if item[1] not in class_idx:
            class_idx[item[1]] = count
            count += 1
    #print('== Found {} classes'.format(len(class_idx)))   #1623类
    return class_idx

class_idx = index_classes(img_items)
#print(class_idx)  #{'Alphabet_of_the_Magi/character01': 0, 'Alphabet_of_the_Magi/character02': 1;,,,,1622}

## 3.构造字典，按类别存放样本

In [4]:
temp = dict()  #把所有样本生成一个字典按类别保存，(类别索引：[图片1，图片2，，，图片20])
for imgname, classes, dirs in img_items:
    img = '{}/{}'.format(dirs, imgname)  #某张图片的地址
    label = class_idx[classes]  #某张图片的类别对应的索引
    transform = transforms.Compose([lambda img: Image.open(img).convert('L'),  #转成灰度图
                                    lambda img: img.resize((28, 28)),  #统一大小
                                    lambda img: np.reshape(img, (28, 28, 1)),
                                    lambda img: np.transpose(img, [2, 0, 1]),   #通道在前
                                    lambda img: img / 255.  #归一化
                                    ])
    img = transform(img)  #图像处理
    if label in temp.keys():
        temp[label].append(img)
    else:
        temp[label] = [img]
        
#print(len(temp))  #1623

## 4.把字典转换成数组，按类划分小数组

In [5]:
#存放所有样本图片[[img1,,,,20张图片],[],[],[],,,,,1623个[]]
img_list = []   
for label, imgs in temp.items():
    img_list.append(np.array(imgs))
img_list = np.array(img_list).astype(np.float)
#print('data shape:{}'.format(img_list.shape))  # (1623, 20, 1, 28, 28)

## 5.分割训练集/验证集/测试集

In [6]:
num_classes = img_list.shape[0]  #训练集+测试集一共有1623类
x_train = img_list[:1100]  #前1100类作为训练集
x_val = img_list[1100:1300]  #后200类作为验证集
x_test=img_list[1300:]  #后323类作为测试集


## 6.把数据集分割成x和y

In [7]:
def split_x_y(subset):
    imgs=[]  #数据集所有图片
    labels=[] #imgs对应的标签
    for i in range(len(subset)):
        for j in range(20):
            img=subset[i][j]
            imgs.append(img)
            labels.append(i)
    return imgs,labels

train_imgs,train_labels=split_x_y(x_train)
val_imgs,val_labels=split_x_y(x_val)
test_imgs,test_labels=split_x_y(x_test)

# 定义dataset类和dataloader类

In [8]:
import torch.utils.data as data

class SiamsDataset(data.Dataset):
    def __init__(self,imgs,labels):
        self.imgs=imgs
        self.labels=labels
        self.classes, self.counts = np.unique(self.labels, return_counts=True)
        self.classes = torch.LongTensor(self.classes)
        
    def __getitem__(self, idx):
        img0_tuple=torch.randperm(len(self.classes))[0]   #第一个样本随机选择一类
        should_get_same_class = random.randint(0,1)  #随机决定是否要获取同类样本
        if should_get_same_class:  
            img1_tuple = img0_tuple
        else:
            while True:    
                #直到找到非同一类别
                img1_tuple = torch.randperm(len(self.classes))[0]   
                if img0_tuple !=img1_tuple:
                    break
            
        img0_idx = torch.randperm(int(self.counts[img0_tuple]))[0] #取出图片一
        img1_idx = torch.randperm(int(self.counts[img1_tuple]))[0] #取出图片二
        
        idx0=img0_tuple*20+img0_idx
        idx1=img1_tuple*20+img1_idx

        return self.imgs[idx0], self.imgs[idx1],should_get_same_class
    def __len__(self):
        return len(self.imgs)

In [9]:
def dataloader(imgs,labels,train_batch_size):
    dataset=SiamsDataset(imgs,labels)
    return torch.utils.data.DataLoader(dataset, shuffle=True,batch_size=train_batch_size)

train_batch_size=32
train_loader=dataloader(train_imgs,train_labels,train_batch_size)
val_loader=dataloader(val_imgs,val_labels,train_batch_size)
test_loader=dataloader(test_imgs,test_labels,train_batch_size)
next(iter(train_loader))[0].shape  #([32, 1, 28, 28])


torch.Size([32, 1, 28, 28])

# 构造孪生网络模型

In [10]:
import torch.nn as nn

class SiamsNet(nn.Module):
    def __init__(self):
        super(SiamsNet, self).__init__()
        self.cnn1 = nn.Sequential(
            nn.ReflectionPad2d(1),   #padding
            nn.Conv2d(1, 4, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(4),
            
            nn.ReflectionPad2d(1),
            nn.Conv2d(4, 8, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(8),
        )

        self.fc1 = nn.Sequential(
            nn.Linear(8*28*28, 500),
            nn.ReLU(inplace=True),
            
            nn.Linear(500, 500),
            nn.ReLU(inplace=True),
            
            nn.Linear(500, 5))

    def forward_once(self, x):
        output = self.cnn1(x)
        output = output.view(output.size()[0], -1)  #拉直
        output = self.fc1(output)
        return output

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2
    
#自定义损失函数
class ContrastiveLoss(torch.nn.Module):

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2, keepdim = True)
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))

        return loss_contrastive

In [14]:
model = SiamsNet().cuda() #定义模型且移至GPU
criterion = ContrastiveLoss() #定义损失函数
optimizer = optim.Adam(model.parameters(), lr = 0.0005) #定义优化器

# 训练模型

In [36]:
counter = []
loss_history = [] 
iteration_number = 0


#开始训练
for epoch in range(0, 2):
    for i, data in enumerate(train_loader, 0):
        img0, img1 , label = data
        #img0维度为torch.Size([32, 1, 100, 100])，32是batch，label为torch.Size([32, 1])
        img0, img1, label=img0.type(torch.FloatTensor),img1.type(torch.FloatTensor),label.type(torch.FloatTensor),
        img0, img1 , label = img0.cuda(), img1.cuda(), label.cuda() #数据移至GPU
        optimizer.zero_grad()
        output1,output2 = model(img0, img1)
        loss_contrastive = criterion(output1, output2, label)
        loss_contrastive.backward()
        optimizer.step()
        if i % 10 == 0 :
            iteration_number +=10
            counter.append(iteration_number)
            loss_history.append(loss_contrastive.item())
    print("Epoch number: {} , Current loss: {:.4f}\n".format(epoch,loss_contrastive.item()))
    

Epoch number: 0 , Current loss: 1.1699

Epoch number: 1 , Current loss: 1.0408



In [20]:
from tqdm import tqdm
import random
    
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

train_loss = [] #记录每个eposide，每个任务更新的结果

val_loss = []

best_loss = 0
best_model_path='C:/Users/user/Desktop/最近/best_protonet/best_model.pth'
last_model_path='C:/Users/user/Desktop/最近/best_protonet/last_model.pth'

#遍历epoch训练
for epoch in range(5):
    print('=== Epoch: {} ==='.format(epoch))
    #取一个batch的训练数据
    tr_iter = iter(train_loader)
    model.train()
    for eposide in tqdm(tr_iter):  #遍历每个eposide，一共遍历iterations次，#len(next(iter(dataloader))) #[75个图片矩阵，75个标签]
        
        x1, x2, y = eposide   #x.shape=([75, 1, 28, 28])
        #print(x1.shape)
        x1, x2, y=x1.type(torch.FloatTensor),x2.type(torch.FloatTensor),y.type(torch.FloatTensor),
        x1, x2, y = x1.to(device), x2.to(device), y.to(device)
        optimizer.zero_grad()  #梯度清零
        
        output1,output2= model(x1,x2)  #前传
        loss= criterion(output1,output2, y)  #计算loss
 
        loss.backward()  #反传
        optimizer.step()  #参数更新
        
        train_loss.append(loss.item())   #加入这个batch的计算loss
        
    avg_loss = np.mean(train_loss[-100:])  #计算本次epoch的所有batch迭代的平均损失
    print('Avg Train Loss: {}'.format(avg_loss))
    
    #每训练一个batch=iterations个任务后，验证一次模型
    val_iter = iter(val_loader)
    model.eval()
    for eposide in val_iter:
        x1, x2, y = eposide   #x.shape=([75, 1, 28, 28])
        x1, x2, y=x1.type(torch.FloatTensor),x2.type(torch.FloatTensor),y.type(torch.FloatTensor),
        x1, x2, y = x1.to(device), x2.to(device), y.to(device)
        
        output1,output2= model(x1,x2)
        loss= criterion(output1,output2, y)  #计算loss
        
        val_loss.append(loss.item())

    avg_loss = np.mean(val_loss[-100:])

    #如果当前模型的验证效果比先前好，则把当前模型记为最佳模型
    postfix = ' (Best)' if avg_loss <= best_loss else ' (Best: {})'.format(best_loss)
    print('Avg Val Loss: {},{}'.format(  #输出本次验证的平均损失和准确率
            avg_loss, postfix))

    #保存某个epoch内所有batch在验证集上得到的最佳模型、准确率
    if avg_loss <= best_loss:
        torch.save(model.state_dict(), best_model_path)
        best_loss = avg_loss
        best_state = model.state_dict()

#print(len(train_loss))  #iterations
#print(len(val_loss)) #iterations

#保存最终所有epoch得到的最佳模型
torch.save(model.state_dict(), last_model_path)
print('最后的结果： best_loss=%d, train_loss=%d, val_loss=%d' % (best_loss, train_loss[-1] ,val_loss[-1]))


  0%|                                                                                          | 0/688 [00:00<?, ?it/s]

=== Epoch: 0 ===


100%|████████████████████████████████████████████████████████████████████████████████| 688/688 [00:36<00:00, 18.77it/s]


Avg Train Loss: 1.1107262504100799


  0%|▏                                                                                 | 2/688 [00:00<00:36, 18.70it/s]

Avg Val Loss: 1.1642585253715516, (Best: 0)
=== Epoch: 1 ===


100%|████████████████████████████████████████████████████████████████████████████████| 688/688 [00:36<00:00, 19.02it/s]


Avg Train Loss: 1.0972810328006743


  0%|▏                                                                                 | 2/688 [00:00<00:37, 18.53it/s]

Avg Val Loss: 1.116766031384468, (Best: 0)
=== Epoch: 2 ===


100%|████████████████████████████████████████████████████████████████████████████████| 688/688 [00:35<00:00, 19.30it/s]


Avg Train Loss: 1.0843740338087082


  0%|▏                                                                                 | 2/688 [00:00<00:38, 18.03it/s]

Avg Val Loss: 1.137569010257721, (Best: 0)
=== Epoch: 3 ===


100%|████████████████████████████████████████████████████████████████████████████████| 688/688 [00:35<00:00, 19.26it/s]


Avg Train Loss: 1.1067077189683914


  0%|▏                                                                                 | 2/688 [00:00<00:39, 17.55it/s]

Avg Val Loss: 1.1296838533878326, (Best: 0)
=== Epoch: 4 ===


100%|████████████████████████████████████████████████████████████████████████████████| 688/688 [00:36<00:00, 18.77it/s]


Avg Train Loss: 1.0917650824785232
Avg Val Loss: 1.103089679479599, (Best: 0)
最后的结果： best_loss=0, train_loss=1, val_loss=1


# 测试模型

In [22]:
test_loss = list()
for epoch in range(10):
    test_iter = iter(test_loader)
    for eposide in test_iter:
        x1, x2, y = eposide   #x.shape=([75, 1, 28, 28])
        x1, x2, y=x1.type(torch.FloatTensor),x2.type(torch.FloatTensor),y.type(torch.FloatTensor),
        x1, x2, y = x1.to(device), x2.to(device), y.to(device)
        
        output1,output2= model(x1,x2)
        loss= criterion(output1,output2, y)  #计算loss
        
        test_loss.append(loss.item())
        
avg_loss = np.mean(test_loss)
print('Test loss: {}'.format(avg_loss))

Test Acc: 1.1037377886842974


In [28]:
model=None
#加载训练最好的模型参数
model = SiamsNet().to(device)
model.load_state_dict(torch.load(os.path.join('C:\\Users\\user\\Desktop\\最近\\best_siamsnet','last_model.pth')))
#model.load_state_dict(best_state)
print('Testing with best model..')

#测试模型
test_loss = list()
for epoch in range(10):
    test_iter = iter(test_loader)
    for eposide in test_iter:
        x1, x2, y = eposide   #x.shape=([75, 1, 28, 28])
        x1, x2, y=x1.type(torch.FloatTensor),x2.type(torch.FloatTensor),y.type(torch.FloatTensor),
        x1, x2, y = x1.to(device), x2.to(device), y.to(device)
        
        output1,output2= model(x1,x2)
        loss= criterion(output1,output2, y)  #计算loss
        
        test_loss.append(loss.item())
        
avg_loss = np.mean(test_loss)
print('Test loss: {}'.format(avg_loss))

Testing with best model..
Test loss: 1.1022309557931258
