In [1]:
import os
file_path = 'self_made_signs'
signs_class = [cla for cla in os.listdir(file_path)]
print(signs_class)

['bowl', 'dark.mp4', 'dog', 'feel', 'get', 'I', 'know', 'like', 'must', 'sick', 'you']


In [None]:
# 创建 训练集train文件夹，并由类名在其目录下创建5个子目录
def mkfile(file):
    if not os.path.exists(file):
        os.makedirs(file)

mkfile('signs/train')

for cla in signs_class:
    mkfile('signs/train/' + cla)

mkfile('signs/val')
for cla in signs_class:
    mkfile('signs/val/' + cla)

In [None]:
#划分train和val,并构建seperate的train和val集

from shutil import copy
import random
 
# 划分比例，训练集 : 验证集 
split_rate = 1/18
 
# 遍历所有类别的全部图像并按比例分成训练集和验证集
for cla in signs_class: #每一个class
    cla_path = file_path + '/' + cla + '/'  # 某一类别的子目录
    images = os.listdir(cla_path)  # iamges 列表存储了该目录下所有图像的名称
    num = len(images)
    eval_index = random.sample(images, k=int(num*split_rate)) #从images中，随机抽取k个
    #print(eval_index) 随机划分训练和验证集
    
    for index, image in enumerate(images): #如图像a(1).jpg,那么index是1,image是a(1).jpg
        # every image is processed, and is stored to a new folder according to its attribute of train or val

        # some old images are val images, they need to store into new folder of val
        if image in eval_index:
            image_path = cla_path + image #original image path
            new_path = 'signs/val/' + cla #new path(val folder)
            copy(image_path, new_path)  # old image to new folder
 
        # other old images store into new folder of train
        else:
            image_path = cla_path + image #original image path
            new_path = 'signs/train/' + cla #new path(train folder)
            copy(image_path, new_path) # old image to new folder

        print("\r[{}] processing [{}/{}]".format(cla, index + 1, num), end="")  # processing schedule

        #\r:回到行首
    
    print() #\n
 
print("processing done!")

In [2]:
# resnet-18 

import torch
import torch.nn as nn
import torch.nn.functional as F

#定义残差块ResBlock

class ResBlock(nn.Module):
    def __init__(self, inchannel, outchannel, identity_downsample=None, stride=1):
        super(ResBlock, self).__init__()
        
        #这里定义了残差块内连续的2个卷积层
        self.conv1 = nn.Conv2d(inchannel,outchannel,kernel_size=3,stride=stride,padding=1)
        self.bn1 = nn.BatchNorm2d(outchannel)
        self.conv2 = nn.Conv2d(outchannel,outchannel,kernel_size=3,stride=1,padding=1)
        self.bn2 = nn.BatchNorm2d(outchannel)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample
            
    def forward(self, x):
        identity = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        
        # if identity_downsample is not None as default, then:
        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)
        
        x = x + identity
        x = self.relu(x)
        
        return x

In [3]:
class ResNet_18(nn.Module):
    
    def __init__(self, image_channels, num_classes):
        
        super(ResNet_18, self).__init__()
        # self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        #resnet layers
        self.layer1 = self.__make_layer(64, 64, stride=1)
        self.layer2 = self.__make_layer(64, 128, stride=2)
        self.layer3 = self.__make_layer(128, 256, stride=2)
        self.layer4 = self.__make_layer(256, 512, stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)
        
    def identity_downsample(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=2, padding=1), 
            nn.BatchNorm2d(out_channels)
        )  
    
    def __make_layer(self, in_channels, out_channels, stride):
        
        identity_downsample = None #默认是none,即identity-free shortcut
        if stride != 1:
            identity_downsample = self.identity_downsample(in_channels, out_channels)
        #对于64-128.128-256.256-512的第一个block,有stride=2,且outchannel=2*inchannel；
        #其他的block,64-64的全部2个,64-128的第2个，128-256的第2个，256-512的第2个，都是outchannel=inchannel
            
        return nn.Sequential(
            ResBlock(in_channels, out_channels, identity_downsample=identity_downsample, stride=stride), 
            ResBlock(out_channels, out_channels)
        )
    
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = x.view(x.shape[0], -1)
        x = self.fc(x)
        return x 
    

In [4]:
model = ResNet_18(3,10)

device = torch.device("cuda:0" if torch.cuda.is_available () else "cpu")

model = model.to(device) # or MyNet().to(device)

In [5]:
from torchvision import transforms # first, we need to preprocess(i.e.transform) the train/val sets
# then we need to load all the train / val sets
from torchvision.datasets import ImageFolder 
from torch.utils.data import DataLoader

ROOT_TRAIN = r'signs/train'
ROOT_TEST = r'signs/val' # test is val in our case

# now begin to preprocess(transform)

normalize = transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
train_transform = transforms.Compose([
    transforms.Resize((224,224)), # 裁剪为224*224
    transforms.RandomVerticalFlip(), # 随机垂直旋转
    transforms.ToTensor(), # 将0-255范围内的像素转为0-1范围内的tensor
    normalize])
 
val_transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    normalize])

# trans are finished, now load the sets(train/val)
# two steps to load the sets, 1.load the sets from the path, pure load 2. add important parameters, like batch_size=?, whether to shuffle?
# after 2, the packaged sets are ready to input into our self-designed net
train_dataset = ImageFolder(ROOT_TRAIN, transform=train_transform)
val_dataset = ImageFolder(ROOT_TEST, transform=val_transform)
 
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=0)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=True, num_workers=0)

In [6]:
for batch_num,(x,label) in enumerate(train_dataloader):
    print(label)

tensor([4, 2, 8, 7, 7, 2, 0, 7, 3, 8, 9, 2, 5, 9, 0, 8])
tensor([1, 7, 0, 8, 8, 3, 7, 6, 7, 5, 3, 8, 4, 2, 6, 4])
tensor([9, 9, 8, 9, 9, 9, 5, 0, 5, 3, 1, 9, 8, 3, 7, 9])
tensor([6, 3, 4, 8, 2, 3, 2, 6, 7, 6, 7, 3, 9, 8, 4, 4])
tensor([9, 9, 3, 8, 5, 1, 9, 2, 7, 3, 2, 2, 3, 0, 8, 9])
tensor([3, 6, 8, 8, 6, 2, 4, 1, 7, 8, 1, 9, 2, 5, 6, 3])
tensor([9, 9, 1, 7, 4, 8, 1, 0, 2, 3, 0, 7, 3, 5, 7, 9])
tensor([5, 8, 6, 5, 5, 5, 3, 9, 3, 2, 4, 3, 8, 9, 6, 6])
tensor([2, 8, 9, 6, 9, 9, 8, 8, 0, 8, 9, 5, 9, 9, 6, 6])
tensor([7, 3, 1, 8, 6, 4, 0, 2, 3, 3, 0, 5, 3, 9, 6, 4])
tensor([0, 3, 7, 9, 5, 9, 2, 5, 7, 8, 4, 5, 1, 7, 7, 4])
tensor([8, 9, 5, 6, 5, 3, 8, 0, 1, 3, 2, 7, 0, 0, 8, 8])
tensor([7, 9, 3, 5, 0, 8, 3, 2, 0, 8, 0, 0, 2, 5, 1, 5])
tensor([2, 2, 1, 2, 1, 1, 1, 3, 0, 3, 2, 0, 4, 6, 7, 3])
tensor([2, 4, 3, 6, 1, 8, 5, 4, 5, 7, 7, 5, 3, 1, 5, 7])
tensor([8, 2, 9, 0, 5, 0, 1, 2, 1, 0, 4, 4, 7, 2, 7, 3])
tensor([2, 8, 4, 4, 5, 6, 3, 5, 9, 3, 4, 7, 1, 1, 4, 9])
tensor([4, 3, 5, 7, 2, 3, 6, 4,

In [7]:
# hyperparameters of the net:
# batch_size has been declared in loading the sets in step 2, the rest are loss_function, optimizer, and learning rate
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [8]:
# 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
    loss, current, n = 0.0, 0.0, 0
    for batch_num, (x, label) in enumerate(dataloader):
 
        # 前向传播
        image, label = x.to(device), label.to(device)
        output = model(image)
        cur_loss = loss_fn(output, label)
        _, pred = torch.max(output, axis=1);# print(pred)
        cur_acc = torch.sum(label == pred)/output.shape[0]
 
        # 反向传播
        optimizer.zero_grad()
        cur_loss.backward()
        optimizer.step()
        loss += cur_loss.item()
        current += cur_acc.item()
        n = n+1 #n:batch_size数量
 
    # loss和acc是两个不同性质的变量
    #loss:损失本身,是一个通过特殊的交叉熵损失函数,得到的连续值
    #acc:准确率,是纯粹的正确百分比,例如头三个batch,正确情况为(10/16,5/16,7/16),那么acc=(10+5+7)/(16+16+16),或者acc=(10/16+5/16+7/16)/n (n=3)
    train_loss = loss/n
    train_acc = current/n
    print('train_loss:' + str(train_loss))
    print('train_acc:' + str(train_acc))
    return train_loss, train_acc

In [9]:
# 开始训练
loss_train = []
acc_train = []
 
epoch = 10
max_acc = 0 # the best ever model

for t in range(epoch):
    print(f"epoch{t+1}\n--------------")
    train_loss, train_acc = train(train_dataloader, model, loss_fn, optimizer)
    
    print("train_loss: ",train_loss)
    print("train_acc: ",train_acc)
    
    if train_acc > max_acc:
        print(f"save best model, 第{t+1}轮")
        max_acc = train_acc
        torch.save(model,"best_sign_model.pth")

epoch1
--------------
train_loss:1.782211630432694
train_acc:0.37962962962962965
train_loss:  1.782211630432694
train_acc:  0.37962962962962965
save best model, 第1轮
epoch2
--------------
train_loss:1.0322398532319952
train_acc:0.625
train_loss:  1.0322398532319952
train_acc:  0.625
save best model, 第2轮
epoch3
--------------
train_loss:0.7311035952082386
train_acc:0.7592592592592593
train_loss:  0.7311035952082386
train_acc:  0.7592592592592593
save best model, 第3轮
epoch4
--------------
train_loss:0.5632679978454554
train_acc:0.8206018518518519
train_loss:  0.5632679978454554
train_acc:  0.8206018518518519
save best model, 第4轮
epoch5
--------------
train_loss:0.3715661434387719
train_acc:0.8796296296296297
train_loss:  0.3715661434387719
train_acc:  0.8796296296296297
save best model, 第5轮
epoch6
--------------
train_loss:0.3140564936178702
train_acc:0.9039351851851852
train_loss:  0.3140564936178702
train_acc:  0.9039351851851852
save best model, 第6轮
epoch7
--------------
train_loss:0.3

In [10]:
# 定义训练函数
# val部分不需要损失函数loss_fn和优化器optimizer,只需要求出对不对就行了
def test(dataloader, model):
    current,n = 0.0,0
    for batch_num, (x, label) in enumerate(dataloader):
        # 前向传播
        image, label = x.to(device), label.to(device)
        output = model(image)
        _, pred = torch.max(output, axis=1);print("label: ",label);print("pred: ",pred)
        cur_acc = torch.sum(label == pred)/output.shape[0]
        
        current += cur_acc.item()
        n=n+1

    test_acc = current/n
    print('test_acc:' + str(test_acc))
    return test_acc

In [12]:
#model = ResNet_18(3,10)
#model.load_state_sict(torch.load('best_sign_model.pth'))
device = torch.device("cuda:0" if torch.cuda.is_available () else "cpu") ; model = model.to(device) # deploy model on device
model.eval() # ready for the test

test_acc = test(val_dataloader, model)

label:  tensor([3, 0, 6, 1, 7, 1, 1, 3, 5, 6, 2, 5, 7, 8, 8, 3])
pred:  tensor([3, 0, 6, 1, 7, 1, 1, 3, 5, 6, 2, 5, 7, 8, 8, 3])
label:  tensor([6, 0, 0, 9, 3, 7, 8, 0, 6, 5, 9, 9, 9, 7, 4, 1])
pred:  tensor([3, 0, 0, 9, 3, 7, 8, 0, 6, 5, 9, 9, 9, 7, 4, 1])
label:  tensor([8, 9, 5, 1, 4, 8, 2, 2, 2, 4, 7, 6, 4, 3, 5, 2])
pred:  tensor([8, 9, 5, 1, 4, 8, 2, 2, 2, 4, 7, 6, 4, 3, 5, 3])
label:  tensor([4, 0])
pred:  tensor([4, 0])
test_acc:0.96875


In [13]:
# 测试一张图像
import cv2
for file in os.listdir("signs_test"):
    img=cv2.imread(file)
    
root_test = r'signs_test' # test is val in our case

val_transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    normalize])

test_dataset = ImageFolder(root_test, transform=val_transform)

test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True, num_workers=0)

In [14]:
for batch_num,(img,label) in enumerate(test_dataloader):
    print(label)

tensor([1, 2, 4, 5, 8, 3, 7, 0, 9, 6])


In [17]:
#model = ResNet_18(3,5)
#model.load_state_sict(torch.load('best_sign_model.pth'))
device = torch.device("cuda:0" if torch.cuda.is_available () else "cpu") ; model = model.to(device) # deploy model on device
model.eval() # ready for the test

test_acc = test(test_dataloader, model)

label:  tensor([1, 2, 6, 8, 9, 7, 5, 3, 0, 4])
pred:  tensor([3, 2, 3, 3, 9, 3, 3, 3, 0, 3])
test_acc:0.4000000059604645


In [None]:
# 背景替换
import random
import os

dir_path = "self_made_signs/bowl/"

percentage_background=0.2

sample_num = int(90*percentage_background)

nums=[]
files= os.listdir(dir_path) 
for file in files:
    file = file[:-4]
    file=int(file)
    nums.append(file)
    
random.sample(nums,sample_num)