In [25]:
import torch
import cv2
import csv
import random
import os
import numpy as np
import pandas as pd
from PIL import Image
import torch.utils.data as Data
from torchvision import transforms, datasets
from torch.utils.data import Dataset, DataLoader
from pandas import read_csv
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

In [26]:
class mydataset(Dataset):    
    def __init__(self, imgname, label,transform=None):
        self.transform = transform        
        self.images =  imgname        
        self.labels = label 
    
    def __len__(self):        
        # 返回数据集的数据数量        
        return len(self.images)  

    def __getitem__(self, index):        
        img_path = self.images[index]        
        label = self.labels[index]        
        img = cv2.imread(img_path)       
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)        
        img = Image.fromarray(img)        
        if self.transform:            
            img = self.transform(img)        
        return img,label
#获取图片路径    
def getImgpath(path, name):    
        return path+"/"+str(name)+".jpg"
#数据预处
data_transfrom = transforms.Compose([
    transforms.Resize((200,200)),
    transforms.ToTensor(),
    
])
    
#将数据划分为训练集和测试集
traindata_path = "/kaggle/input/trainphotos/train/"
testdata_path = "/kaggle/input/testphotos/test/"

train_csv= '/kaggle/input/traincsv/train.csv'
test_csv= '/kaggle/input/testcsv/test.csv'

train_data=pd.read_csv(train_csv)
test_data=pd.read_csv(test_csv)

train_img_id = train_data.iloc[:,0].values    
test_img_id = test_data.iloc[:,0].values

train_label = train_data.iloc[:,1].values
test_label = test_data.iloc[:,1].values
    
train_img_path = [getImgpath(name=name,path=traindata_path) for name in train_img_id]
test_img_path = [getImgpath(name=name,path=testdata_path) for name in test_img_id]
# 通过GetLoader将数据进行加载，返回Dataset对象，包含data和labels
train_data = mydataset(train_img_path, train_label, data_transfrom)
test_data = mydataset(test_img_path, test_label, data_transfrom)
#使用DataLoader加载并设置batchsize
train_loader = DataLoader(train_data, batch_size=64, drop_last=True,shuffle = True)
test_loader = DataLoader(test_data, batch_size=64, drop_last=True)

In [27]:
import torch.nn as nn
import torch.nn.functional as F


class Residual(nn.Module):
    def __init__(self, in_channels, out_channels, use_1x1conv=False, stride=1): 
        super(Residual, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride)  # 定义第一个卷积块
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1)  # 定义第二个卷积块

        # 定义1x1卷积块
        if use_1x1conv:
            self.conv3 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride)
        else:
            self.conv3 = None

        # Batch归一化
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
    # 定义前向传播路径
    def forward(self, x):
        y = nn.functional.relu(self.bn1(self.conv1(x)))
        y = self.bn2(self.conv2(y))
        if self.conv3:
            x = self.conv3(x)

        return nn.functional.relu(y + x)

def resnet_block(in_channels, out_channels, num_residuals, first_block=False):
    if first_block:
        assert in_channels == out_channels  # 第一个模块的通道数同输入通道数一致
    blk = []
    for i in range(num_residuals):
        if i == 0 and not first_block:
            blk.append(Residual(in_channels, out_channels, use_1x1conv=True, stride=2))
        else:
            blk.append(Residual(out_channels, out_channels))
    return nn.Sequential(*blk)


# 定义一个全局平均池化层
class GlobalAvgPool2d(nn.Module):
    def __init__(self):
        super(GlobalAvgPool2d, self).__init__()

    def forward(self, x):
        return nn.functional.avg_pool2d(x, kernel_size=x.size()[2:])  # 池化窗口形状等于输入图像的形状

# 定义ResNet网络结构
def ResNet():
    net = nn.Sequential(
        nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
        nn.BatchNorm2d(64),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

    net.add_module("resnet_block1", resnet_block(64, 64, 2, first_block=True))
    net.add_module("resnet_block2", resnet_block(64, 128, 2))
    net.add_module("resnet_block3", resnet_block(128, 256, 2))
    net.add_module("resnet_block4", resnet_block(256, 512, 2))
    net.add_module("global_avg_pool", GlobalAvgPool2d())  # GlobalAvgPool2d的输出: (Batch, 512, 1, 1)
    net.add_module("fc", nn.Sequential(nn.Flatten(), nn.Linear(512, 10)))

    return net
device = torch.device("cuda")
model = ResNet().to(device)
print(model)



Sequential(
  (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
  (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU()
  (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (resnet_block1): Sequential(
    (0): Residual(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): Residual(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, a

In [28]:
## Specify loss and optimization functions

# specify loss function
criterion = nn.CrossEntropyLoss()

# specify optimizer
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

In [29]:
# number of epochs to train the model
n_epochs = 10  # suggest training between 20-50 epochs

model.train() # prep model for training
device = torch.device("cuda")
for epoch in range(n_epochs):
    # monitor training loss
    train_loss = 0.0
    for step, (b_x, b_y) in enumerate(train_loader): 
        #----------------------------#
        #   计算loss并修正权值
        #----------------------------#   
        b_x = b_x.to(device)
        b_y = b_y.to(device)
        output = model(b_x)
        loss = criterion(output, b_y) 
        optimizer.zero_grad() 
        loss.backward() 
        optimizer.step() 
        #----------------------------#
        #   打印
        #----------------------------#   
        if (step+1) % 100 == 0:
            print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch + 1, n_epochs, loss.item()))

Epoch [1/10], Loss: 0.3910
Epoch [1/10], Loss: 0.4404
Epoch [2/10], Loss: 0.2753
Epoch [2/10], Loss: 0.3835
Epoch [3/10], Loss: 0.2359
Epoch [3/10], Loss: 0.2031
Epoch [4/10], Loss: 0.1573
Epoch [4/10], Loss: 0.1380
Epoch [5/10], Loss: 0.0910
Epoch [5/10], Loss: 0.0406
Epoch [6/10], Loss: 0.1189
Epoch [6/10], Loss: 0.0997
Epoch [7/10], Loss: 0.1692
Epoch [7/10], Loss: 0.0822
Epoch [8/10], Loss: 0.1138
Epoch [8/10], Loss: 0.2654
Epoch [9/10], Loss: 0.0348
Epoch [9/10], Loss: 0.0452
Epoch [10/10], Loss: 0.0167
Epoch [10/10], Loss: 0.1022


In [30]:
model.eval()      #测试模式，关闭正则化
correct = 0
total = 0
test_loss = 0
for data, target in test_loader:
    data, target = data.to(device),target.to(device)
    outputs = model(data)
    _, predicted = torch.max(outputs, 1)   #返回值和索引
    total += target.size(0)
    correct += (predicted == target).sum().item()
    loss = criterion(outputs, target) 
    test_loss += loss.item()*data.size(0)

print('测试准确率: {:.4f}'.format(100.0*correct/total))

测试准确率: 84.4758
