In [7]:
import os
import numpy as np
import torch
import pandas as pd
import cv2
import time
from torch import optim
from torchvision import transforms
from torch import nn
from torch.utils.data import DataLoader, Dataset

def Readfile(path, flag) :
    image_dir = sorted(os.listdir(path))
    x = np.zeros((len(image_dir), 128, 128, 3), dtype = np.uint8)
    y = np.zeros(len(image_dir), dtype = np.uint8)
    
    for i, file in enumerate(image_dir) :
        img = cv2.imread(os.path.join(path, file), cv2.IMREAD_COLOR)
        x[i,: ,: ,: ] = cv2.resize(img, (128, 128))
        
        if flag :
            y[i] = int(file.split('_')[0])
        if flag :
            return x, y
        else :
            return x
        
workspace_dir = './food-11'
train_x, train_y = Readfile(os.path.join(workspace_dir, "training"), True)
val_x, val_y = Readfile(os.path.join(workspace_dir, "validation"), True)
test_x = Readfile(os.path.join(workspace_dir, "testing"), False)

train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(), #随机翻转图片
    transforms.RandomRotation(15),    #随机选装图片15°
    transforms.ToTensor(),
])
test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
])

class ImgDataset(Dataset) :
    def __init__(self, x, y = None, transform = None) :
        self.x = x
        self.y = y
        if y is not None :
            self.y = torch.LongTensor(y)
        self.transform = transform
    def __getitem__(self, item) :
        x = self.x[item]
        if self.y is not None :
            y = self.y[item]
        if self.transform is not None :
            x = self.transform(x)
        if self.y is not None :
            return x, y
        else :
            return x
    def __len__(self) :
        return len(self.x)

batch_size = 8
train_x = np.concatenate((train_x, val_x), axis = 0)
train_y = np.concatenate((train_y, val_y), axis = 0)
train_set = ImgDataset(train_x, train_y, train_transform)
val_set = ImgDataset(val_x, val_y, test_transform)
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

class Classifier(nn.Module) :
    def __init__(self) :
        super(Classifier, self).__init__()
         # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # torch.nn.MaxPool2d(kernel_size, stride, padding)
        # input 維度 [3, 128, 128]
        #当填充padding时使图像输入和输出维度不变
        self.CNN = nn.Sequential(
            nn.Conv2d(3, 64, 3, 1, 1),  #输出图像为[64, 128, 128]
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
            
            nn.Conv2d(64, 128, 3, 1, 1),#输出为[128, 64,64]
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2, 2, 0),
            
            nn.Conv2d(128, 256, 3, 1, 1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
            
            nn.Conv2d(256, 512, 3, 1, 1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
            
            nn.Conv2d(512, 512, 3, 1, 1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
        )
        self.fc = nn.Sequential(
            nn.Linear(512 * 4 * 4, 1024),
            nn.Linear(1024, 512),
            nn.Linear(512, 11),
        )
    def forward(self, x) :
        out = self.CNN(x)
        out = out.view(out.size(0), -1)#表示batch_size，且把图片展平
        return self.fc(out)

device = "cpu"#torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = Classifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = 0.001)
max_epoch = 30

for epoch in range(max_epoch) :
    epoch_start_time = time.time()
    train_loss = 0.0
    train_acc = 0.0
    val_loss = 0.0
    val_acc = 0.0
    
    model.train()
    for x, label in train_loader :
        x, label = x.to(device), label.to(device)
        optimizer.zero_grad()
        output = model(x)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        _, pred = output.max(1)
        num_correct = (pred == label).sum().item()
        acc = num_correct / x.shape[0]
        train_acc += acc
    model.eval()
    with torch.no_grad() :
        for x, label in val_loader :
            x, label = x.cuda(), label.cuda()
            loss = criterion(output, label)
            val_loss += loss.item()
            _, pred = output.max(1)
            num_correct = (pred == label).sum().item()
            acc = num_correct / x.shape[0]
            val_acc += acc
        print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f' % 
              (epoch + 1, max_epoch, epoch_start_time, 
               train_acc / len(train_loader), train_loss, 
               val_acc / len(val_loader), val_loss))
        
test_set = ImgDataset(test_x, transform = test_transform)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)
predict = []
model,eval()

with torch.no_grad() :
    for x in test_loader :
        x.to(device)
        test_pred = model(x)
        #test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1)返回最大值索引
        _, label = test_pred.max(1)
        for i in label :
            predict.append(i)
with open('predict.csv', 'w') as file :
    file.write('Id,Category\n')
    for i, label in enumerate(predict) :
        file.write('{},{}\n'.format(i,label))

KeyboardInterrupt: 