https://www.cs.cmu.edu/~rsalakhu/papers/oneshot1.pdf  
模型定义用的这篇文章里的，损失函数和优化器及超参数策略是自己按照SVM里面的来的

In [1]:
import numpy as np
import torch
import json
import random
import imageio
from torch import nn
from torch.nn import init
from collections import OrderedDict
import cv2
import time

In [2]:
label_to_idx = {
    'p1':0, 'p12':1, 'p14':2, 'p17':3, 'p19':4, 
    'p22':5, 'p25':6, 'p27':7, 'p3':8, 'p6':9, 
    'p9':10
}

idx_to_label = [
    'p1', 'p12', 'p14', 'p17', 'p19', 
    'p22', 'p25', 'p27', 'p3', 'p6', 
    'p9'
]

# 初始化训练数据
def init_train_data():
    start = time.time()
    with open("train.json", "r") as f:
        dic = json.loads(f.read())
    names = list(dic)
    labels = list(dic.values())
    num_examples = len(names)
    features = []
    idx_labels = []
    for i in range(num_examples):
        name = names[i]
        label = labels[i]
        path = "Train\\" + label + "\\" + name
        img = imageio.imread(path)
        features.append(np.array(img[:, :, 0 : 3]))
        idx_labels.append(label_to_idx[label])
    print("time %.2f sce" % (time.time() - start))
    return features, idx_labels
# features 是一个 list，元素为np.array，形状为[宽 * 高 * 3（维度）]

features, labels = init_train_data()

time 0.04 sce


In [3]:
# 标准尺寸
width, height = 105, 105

# 设备
device = 'cuda' if torch.cuda.is_available() else 'cpu'

class SiameseNet(nn.Module):
    def __init__(self):
        super(SiameseNet, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size = 10), 
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = 2), 
            nn.Conv2d(64, 128, kernel_size = 7), 
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = 2), 
            nn.Conv2d(128, 128, kernel_size = 4), 
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = 2), 
            nn.Conv2d(128, 256, kernel_size = 4), 
            nn.ReLU()
        )
        self.fc1 = nn.Sequential( 
            nn.Linear(256 * 6 * 6, 4096), 
            nn.Sigmoid()
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size = 10), 
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = 2), 
            nn.Conv2d(64, 128, kernel_size = 7), 
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = 2), 
            nn.Conv2d(128, 128, kernel_size = 4), 
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = 2), 
            nn.Conv2d(128, 256, kernel_size = 4), 
            nn.ReLU()
        )
        self.fc2 = nn.Sequential( 
            nn.Linear(256 * 6 * 6, 4096), 
            nn.Sigmoid()
        )
        self.fc = nn.Sequential( 
            nn.Linear(4096, 1), 
            nn.Sigmoid()
        )
        
    def forward(self, img1, img2):
        feature11 = self.conv1(img1)
        feature12 = self.fc1(feature11.view(img1.shape[0], -1))
        feature21 = self.conv2(img2)
        feature22 = self.fc2(feature21.view(img2.shape[0], -1))
        feature3 = abs(feature12 - feature22)
        output = self.fc(feature3)
        return output.view((output.shape[0]))

In [4]:
# 重要！！！
# 此处定义net，保证每次重新训练均重新初始化net
net = SiameseNet()
if torch.cuda.is_available():
    net.cuda()


# 损失函数
loss = nn.CrossEntropyLoss()

# 优化器
lr = 0.01
optimizer = torch.optim.SGD(net.parameters(), lr = lr, momentum = 0.5)

# 类别数
minibatch = 11

# 获取数据，X为tensor，形状为[bath_size * 3(dim) * width * hight]
images = []
for image in features:
    image = cv2.resize(image, (width, height), interpolation = cv2.INTER_AREA) # 用cv2转换大小
    image = torch.tensor(image, dtype = torch.float, device = device)
    images.append(image)
X = torch.stack(images)
X = X.view((minibatch, 3, width, height))

y = torch.tensor(range(minibatch), device = device)

start = time.time()

# 迭代
epoch_num = 200

#计数器，用于提前终止
fitcount = 0

for epoch in range(epoch_num):
    # 前向运算
    y_hat = []
    for i in range(minibatch):
        X0 = X[i, :, :, :].view((1, 3, width, height))
        y_hati = net(X0, X)
        y_hat.append(y_hati)
    y_hat = torch.stack(y_hat)
    y_hat.device
    
    # 带范数惩罚的损失函数
    norm = 0
    for param in net.parameters():
        norm = norm + param.norm()
    l = loss(y_hat, y).sum() + norm * 0.05

    # 梯度清零
    optimizer.zero_grad()

    # 后向梯度
    l.backward()
    optimizer.step()

    # 显示分类结果
    if (epoch + 1) % 10 == 0:
        print('epoch %d, time %.2f sec' % (epoch + 1, time.time() - start))
        print(torch.argmax(y_hat, dim = 1))
        
    # 检验是否匹配
    if sum(torch.argmax(y_hat, dim = 1) - torch.tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], device = device)) == 0:
        fitcount = fitcount + 1
    else:
        fitcount = 0
    
    # 调整学习率
    lr = 0.99 * lr
    optimizer = torch.optim.SGD(net.parameters(), lr = lr, momentum = 0.5)
    
    # 提前终止
    if fitcount == 20:
        print('epoch %d, time %.2f sec' % (epoch + 1, time.time() - start))
        print(torch.argmax(y_hat, dim = 1))
        print('lr = ', lr)
        break

epoch 10, time 6.22 sec
tensor([ 3,  8,  3,  3,  4,  7,  6,  7,  8,  9, 10], device='cuda:0')
epoch 20, time 11.49 sec
tensor([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 7], device='cuda:0')
epoch 30, time 16.74 sec
tensor([2, 1, 2, 3, 4, 5, 6, 7, 8, 9, 2], device='cuda:0')
epoch 40, time 21.89 sec
tensor([2, 1, 2, 3, 4, 4, 6, 5, 8, 9, 2], device='cuda:0')
epoch 50, time 27.02 sec
tensor([8, 1, 9, 9, 4, 4, 6, 4, 6, 8, 0], device='cuda:0')
epoch 60, time 32.15 sec
tensor([0, 1, 2, 3, 4, 4, 6, 4, 8, 9, 0], device='cuda:0')
epoch 70, time 37.29 sec
tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10], device='cuda:0')
epoch 80, time 42.42 sec
tensor([ 0,  1,  2,  3,  4,  4,  6,  7,  1,  9, 10], device='cuda:0')
epoch 90, time 47.56 sec
tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10], device='cuda:0')
epoch 100, time 52.71 sec
tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10], device='cuda:0')
epoch 110, time 57.87 sec
tensor([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10], device='cuda:0')
epoch 120

最先采用SGD，0.1的学习率，发现始终无法收敛  
通过打印值发现存在震荡，于是想到可能是学习率过大  
进一步分析，是因为训练集过小，参数对结果的影响相对变大  
因此学习率一旦较大就会开始震荡，无法收敛  
（可以写到报告里）

In [5]:
# 用模型生成test文件，生成的文件名默认为test1.json
def init_test_file(net, outname = "test1.json"):
    with open("test.json", "r") as f:
        dic = json.loads(f.read())
    names = list(dic)
    num_examples = len(names)
    for name in names:
        path = "Test\\" + name
        img = imageio.imread(path)
        image = np.array(img[:, :, 0 : 3])
        image = cv2.resize(image, (width, height), interpolation = cv2.INTER_AREA)
        image = torch.tensor(image, dtype = torch.float, device = device)
        image = image.view(1, 3, width, height)
        y = net(image, X)
        label = labels[torch.argmax(y)]
        dic[name] = idx_to_label[label]
    json_str = json.dumps(dic)
    with open(outname, "w") as f:
        f.write(json_str)
        
init_test_file(net)
print("Done!")

Done!


从test.py里面照搬的准确率验证程序  
执行需要相同文件目录下有两个文件：  
pred.json 正确的标注，在这里是我按照规律手工标注的  
test.json 训练得到的标注，相对于原代码修改成了test1.json，尽量不去修改原有的文件  

In [6]:
import json

pred = json.load(open('pred.json', 'r'))
label = json.load(open('test1.json', 'r'))

classes = []
correct = {}
total = {}
for cls in label.values():
    if cls not in classes:
        classes.append(cls)
        correct[cls] = 0
        total[cls] = 0
classes.sort()

miss = 0
cor = 0
for imgname in label.keys():
    try:
        correct[label[imgname]] += (pred[imgname] == label[imgname])
    except:
        miss += 1
    total[label[imgname]] += 1
acc_str = '%d imgs missed\n'%miss
for cls in classes:
    acc_str += 'class:%s\trecall:%f\n'%(cls, correct[cls]/total[cls])
    cor += correct[cls]
acc_str += 'Accuracy: %f'%(cor/len(label))
print(acc_str)

0 imgs missed
class:p1	recall:0.187500
class:p12	recall:0.071429
class:p14	recall:0.222222
class:p17	recall:0.464286
class:p19	recall:0.157895
class:p25	recall:0.800000
class:p27	recall:0.275862
class:p3	recall:0.333333
class:p6	recall:0.156250
class:p9	recall:0.500000
Accuracy: 0.268182
