In [1]:
import torchvision
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch
import torch.nn.functional as F
import os
from tqdm import tqdm

from torch.optim.lr_scheduler import ReduceLROnPlateau

import email_remind 
from sklearn.metrics import accuracy_score

torch.Size([1, 64, 224, 224])  
torch.Size([1, 128, 112, 112])  
torch.Size([1, 256, 56, 56])  
torch.Size([1, 512, 28, 28])  
torch.Size([1, 512, 14, 14])  

In [21]:
# vgg16 特征提取
class vgg_16_blocks(nn.Module):
    def __init__(self):
        super(vgg_16_blocks, self).__init__()
        # 从预训练模型中拿到5层
        vgg_model = torchvision.models.vgg16(pretrained=True)
        self.conv1 = nn.Sequential()
        self.conv2 = nn.Sequential()
        self.conv3 = nn.Sequential()
        self.conv4 = nn.Sequential()
        self.conv5 = nn.Sequential()

        for i, layer in enumerate(list(vgg_model.features)):
            if i < 3:
                self.conv1.add_module(str(i), layer)
            elif i < 8:
                self.conv2.add_module(str(i), layer)
            elif i < 15:
                self.conv3.add_module(str(i), layer)
            elif i < 22:
                self.conv4.add_module(str(i), layer)
            elif i < 29:
                self.conv5.add_module(str(i), layer)
        
        # 卷积融合特征
        self.conv1_1 = nn.Sequential(
            nn.Conv2d(64, 32, 3, 1, 1),
            nn.SiLU(),
            nn.Conv2d(32, 16, 3, 1, 1),
        )
        self.conv1_2 = nn.Sequential(
            nn.Conv2d(128, 64, 3, 1, 1),
            nn.SiLU(),
            nn.Conv2d(64, 32, 3, 1, 1),
            )
        self.conv1_3 = nn.Sequential(
            nn.Conv2d(256, 128, 3, 1, 1),
            nn.SiLU(),
            nn.Conv2d(128, 64, 3, 1, 1),
            )
        self.conv1_4 = nn.Sequential(
            nn.Conv2d(512, 256, 3, 1, 1),
            nn.SiLU(),
            nn.Conv2d(256, 128, 3, 1, 1),
            )
        self.conv1_5 = nn.Sequential(
            nn.Conv2d(512, 512, 3, 1, 1),
            nn.SiLU(),
            nn.Conv2d(512, 512, 3, 1, 1),
            )

    def forward(self, x):
        x1 = self.conv1(x)
        x2 = self.conv2(x1)
        x3 = self.conv3(x2)
        x4 = self.conv4(x3)
        x5 = self.conv5(x4)
        # 上下采样到第三层的尺寸
        x1 = F.interpolate(x1, x3.shape[-2:],mode='bilinear',
                            align_corners=True)
        x2 = F.interpolate(x2, x3.shape[-2:],mode='bilinear',
                            align_corners=True)
        x4 = F.interpolate(x4, x3.shape[-2:],mode='bilinear',
                            align_corners=True)
        x5 = F.interpolate(x5, x3.shape[-2:],mode='bilinear',
                           align_corners=True)
        x1 = self.conv1_1(x1)
        x2 = self.conv1_2(x2)
        x3 = self.conv1_3(x3)
        x4 = self.conv1_4(x4)
        x5 = self.conv1_5(x5)
        l = torch.cat([x1, x2], dim=1)
        h = torch.cat([x3, x4, x5], dim=1)
        return l, h
# 空间注意力


class space_att(nn.Module):
    def __init__(self, c=48):
        super(space_att, self).__init__()
        self.convs = nn.Sequential(
            nn.Conv2d(c, c, 3, 1, 1),
            nn.SiLU(),
            nn.Conv2d(c, c, 3, 1, 1),
            nn.SiLU(),
            nn.Conv2d(c, 1, 1, 1, 0),
            nn.Tanh(),
            nn.Conv2d(1, 1, 1, 1, 0),
            nn.Sigmoid()
        )

    def forward(self, x):
        x1 = self.convs(x)
        x = x*x1+x
        return x
# 通道注意力


class channel_att(nn.Module):
    def __init__(self, c=1280):
        super(channel_att, self).__init__()
        self.Pool = nn.Sequential(
            nn.BatchNorm2d(c),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(start_dim=-3, end_dim=-1),
            nn.Linear(c, c//4),
            nn.ReLU(),
            nn.Linear(c//4, c//8),
            nn.ReLU(),
            nn.Linear(c//8, c//4),
            nn.ReLU(),
            nn.Linear(c//4, c),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, h, w = x.shape
        y = self.Pool(x).view(b, c, 1, 1)
        return x * y.expand_as(x)+x


class classifier(nn.Module):
    def __init__(self, ch=1280, cl=192, n=6):
        super().__init__()

        self.Pool = nn.AdaptiveMaxPool2d([4,4])
        self.flatten = nn.Flatten(-3,-1)
        self.fc = nn.Sequential(
            nn.Linear((ch+cl)*4*4, 1000),
            nn.SiLU(),
            nn.Linear(1000, 500),
            nn.SiLU(),
            nn.Linear(500, 100),
            nn.SiLU(),
            nn.Linear(100, 20),
            nn.SiLU(),
            nn.Linear(20, n),
            # nn.Softmax(dim=-1)
        )
        self.conv1_1_l = nn.Conv2d(cl, cl, 1, 1)
        self.conv1_1_h = nn.Conv2d(ch, ch, 1, 1)

    def forward(self, l, h):
        l = self.conv1_1_l(l)
        h = self.conv1_1_h(h)
        x = torch.cat([l, h], dim=1)
        x = self.Pool(x)
        x =self.flatten(x)
        x = self.fc(x)
        return x
    
class my_model(nn.Module):
    def __init__(self, ch=704,cl=48,n=6):
        super().__init__()
        self.vgg_16_blocks=vgg_16_blocks()
        self.space_att=space_att(cl)
        self.channel_att=channel_att(ch)
        self.classifier=classifier(ch=ch,cl=cl,n=n)
    def forward(self,x):
        l,h=self.vgg_16_blocks(x)
        l=l.detach()
        h=h.detach()
    
        l=self.space_att(l)
        h=self.channel_att(h)

        
        x=self.classifier(l,h)
        return x

In [3]:
train_transform = transforms.Compose([
        transforms.Resize(250),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

full_dataset =torchvision.datasets.ImageFolder(root='images',transform=train_transform)

# length 数据集总长度
full_data_size = len(full_dataset)
print("总数据集的长度为：{}".format(full_data_size))

train_size = int(0.9 * len(full_dataset))
test_size = len(full_dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(
    full_dataset, [train_size, test_size])

总数据集的长度为：1980


In [4]:
torch.save(train_dataset, 'train_dataset1.pkl')
torch.save(test_dataset, 'test_dataset1.pkl')

In [4]:
train_dataset=torch.load('train_dataset1.pkl')
test_dataset=torch.load('test_dataset1.pkl')

train_loader = DataLoader(dataset=train_dataset, batch_size=2,
                          shuffle=True, num_workers=0, drop_last=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=1,
                         shuffle=False, num_workers=0, drop_last=True)

train_data_size = len(train_loader)
test_data_size = len(test_loader)
print(train_data_size, test_data_size)

891 198


In [5]:
Epoch_path = "Epoch.txt"

def get_epoch(path):
    if not os.path.exists(path): 
        Epoch_file=open(path, "w")
        Epoch_file.write("-1")
        Epoch_file.close()
    Epoch_file=open(path, "r")
    s=Epoch_file.readline()
    Epoch_file.close()
    return int(s)+1

def set_epoch(path):
    if not os.path.exists(path): 
        Epoch_file=open(path, "w")
        Epoch_file.write("-1")
        Epoch_file.close()
    Epoch_file=open(path, "r")
    s=Epoch_file.readline()
    Epoch_file.close()
    Epoch_file=open(path, "w")
    Epoch_file.write(str(int(s)+1))
    Epoch_file.close()
# print(get_epoch("Epoch.txt"))
# set_epoch("Epoch.txt")
get_epoch(Epoch_path)

81

In [9]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model=my_model()
model.to(device)
model.train()
model_test = my_model()
model_test.to(device)
model_test.eval()
pass



测试模型

In [8]:
for x in train_loader:
    out= model_test(x[0].to(device))
    print(out.shape)
    break

torch.Size([2, 6])


In [27]:
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, 
                              patience=1, verbose=False, threshold=0.001, 
                              threshold_mode='rel', cooldown=0, min_lr=0, eps=1e-08)
ture_epoch = get_epoch(Epoch_path)
if ture_epoch > 0:
    model.load_state_dict(torch.load(f'weights/{ture_epoch-1}_model.pth'))
    optimizer.load_state_dict(torch.load(
        f'weights/{ture_epoch-1}_optimizer.pth'))
loss_function = nn.CrossEntropyLoss()



Epoch = 10
for epoch in tqdm(range(Epoch), desc="train",unit='epoch'):
    # loss_all=None
    log_loss = open("log.csv", "a")
    ture_epoch = get_epoch(Epoch_path)
    loss_sum = 0
    st = 0
    for x, y in train_loader: 
        optimizer.zero_grad()
        x = x.to(device)
        y = y.type(torch.long).to(device)
        out= model(x)
        loss = loss_function(out, y)
        # if loss_all is None:
        #     loss_all = loss
        # else:
        #     loss_all += loss
        loss.backward()
        optimizer.step()
        st += 1
        loss_sum += loss.to("cpu").detach().numpy()
    
    
    log_loss.write("{}, {}\n".format(ture_epoch, loss_sum/st))

    name = f"weights/{ture_epoch}"
    if (epoch+1) % 1 == 0:
        torch.save(model.state_dict(), name+"_model.pth")
        torch.save(optimizer.state_dict(), name+"_optimizer.pth")
    log_loss.close()
    set_epoch(Epoch_path)  # ture_epoch ++


    if (epoch+1) % 1 == 0:
        # 评估
        ture_epoch = get_epoch(Epoch_path)
        if ture_epoch > 0:
            model_test.load_state_dict(torch.load(
                f'weights/{ture_epoch-1}_model.pth'))
        acc=0
        for x, y in test_loader: 
            out= model_test(x.to(device))
            y_pred=out.to("cpu").detach().numpy()
            y_true=y.to("cpu").detach().numpy()
            ac1=accuracy_score(y_true, y_pred.argmax(axis=-1))
            acc+=ac1
        acc=acc/test_loader.__len__()
        scheduler.step(torch.tensor(acc))
        acc_loss = open("acc.csv", "a")
        acc_loss.write("{}, {}\n".format(get_epoch(Epoch_path), acc))
        acc_loss.close()
        # email_remind.send_email(f"acc={acc}")
# email_remind.send_email(None)

train: 100%|████████████████████████████████████████████████████████████████████████| 10/10 [06:49<00:00, 40.95s/epoch]


In [26]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model_test = my_model()

# ture_epoch = get_epoch(Epoch_path)
ture_epoch=79
if ture_epoch > 0:
    model_test.load_state_dict(torch.load(
        f'weights/{ture_epoch-1}_model.pth'))

model_test.to(device)
model_test.eval()
acc=0
for x, y in tqdm(test_loader): 
# for x, y in tqdm(train_loader): 
    out= model_test(x.to(device))
    y_pred=out.to("cpu").detach().numpy()
    y_true=y.to("cpu").detach().numpy()
    ac1=accuracy_score(y_true, y_pred.argmax(axis=-1))
    acc+=ac1
acc/test_loader.__len__()
# acc/train_loader.__len__()


100%|████████████████████████████████████████████████████████████████████████████████| 198/198 [00:03<00:00, 61.87it/s]


0.5454545454545454

In [None]:
l=("愤怒","厌恶","恐惧","快乐","悲伤","惊喜")