In [None]:
import torch
import os
import d2l
from torch import nn
from torch import optim
from torch.utils import data
from torchvision import transforms
from torch.utils.data.dataset import Dataset
import torch.nn.functional as F
import csv
from d2l import torch as d2l

import pandas as pd
import numpy as np
from PIL import Image

os.environ["KMP_DUPLICATE_OK"]="TRUE"
d2l.use_svg_display()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
class DatasetFromCSV(Dataset):
    def __init__(self, csv_path, height, width, transforms=None):
        self.data = pd.read_csv(csv_path)
        self.labels = np.asarray(self.data.iloc[:, 0])
        self.height = height
        self.width = width
        self.transforms = transforms
        
    def __getitem__(self, index):
        single_image_label = self.labels[index]
        # 读取所有像素值，并将 1D array ([784]) reshape 成为 2D array ([28,28])
        img_as_np = np.asarray(self.data.iloc[index][1:]).reshape(28, 28).astype(float)
        # 把 numpy array 格式的图像转换成灰度 PIL image
        img_as_img = Image.fromarray(np.uint8(img_as_np))
        img_as_img = img_as_img.convert('L')
        # 将图像转换成 tensor
        if self.transforms is not None:
            img_as_tensor = self.transforms(img_as_img)
        return (img_as_tensor, single_image_label)
    
    def __len__(self):
        return len(self.data.index)

def load_data(batch_size, resize=None):
    trans = [transforms.ToTensor()]
    if resize:
        trans.insert(0, transforms.Resize(resize))
    transform = transforms.Compose(trans)
    train_data= DatasetFromCSV('fashion-mnist_train.csv', 28,28,transform)
    test_data = DatasetFromCSV("fashion-mnist_test.csv",28,28,transform)
    return (torch.utils.data.DataLoader(train_data,batch_size=batch_size),
            torch.utils.data.DataLoader(train_data,batch_size=batch_size),
            torch.utils.data.DataLoader(test_data,batch_size=1))

def evaluate_accuracy_gpu(net, data_iter, device=None):
    if isinstance(net, torch.nn.Module):
        net.eval()  # 设置为评估模式
        if not device:
            device = next(iter(net.parameters())).device
    # 正确预测的数量，总预测的数量
    metric = d2l.Accumulator(2)
    for X, y in data_iter:
        if isinstance(X, list):
            # BERT微调所需的（之后将介绍）
            X = [x.to(device) for x in X]
        else:
            X = X.to(device)
        y = y.to(device)
        metric.add(d2l.accuracy(net(X), y), y.numel())
    return metric[0] / metric[1]

#@save
def train_ch6(net, train_iter, test_iter, num_epochs, lr, device):
    def init_weights(m):
        if type(m) == nn.Linear or type(m) == nn.Conv2d:
            nn.init.xavier_uniform_(m.weight)
    net.apply(init_weights)
    print('training on', device)
    net.to(device)
#     optimizer = torch.optim.SGD(net.parameters(), lr=lr)
    optimizer = torch.optim.Adam(net.parameters(),lr=lr,betas=(0.9, 0.999),eps=1e-08,weight_decay=0,amsgrad=False)
    loss = nn.CrossEntropyLoss()
    timer, num_batches = d2l.Timer(), len(train_iter)
    for epoch in range(num_epochs):
        metric = d2l.Accumulator(3)
        net.train()
        for i, (X, y) in enumerate(train_iter):
            optimizer.zero_grad()
            X, y = X.to(device), y.to(device)
            y_hat = net(X)
            l = loss(y_hat, y)
            l.backward()
            optimizer.step()
            with torch.no_grad():
                metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])
            train_l = metric[0] / metric[2]
            train_acc = metric[1] / metric[2]
        veri_acc = evaluate_accuracy_gpu(net, test_iter)
        print(f'epoch:{epoch},loss:{train_l:.3f},train acc:{train_acc:.3f},veri acc:{veri_acc:.3f}')
        if veri_acc > 0.99:
            break


In [None]:
class Residual(nn.Module):  #@save
    def __init__(self, input_channels, num_channels,
                 use_1x1conv=False, strides=1):
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, num_channels,
                               kernel_size=3, padding=1, stride=strides)
        self.conv2 = nn.Conv2d(num_channels, num_channels,
                               kernel_size=3, padding=1)
        if use_1x1conv:
            self.conv3 = nn.Conv2d(input_channels, num_channels,
                                   kernel_size=1, stride=strides)
        else:
            self.conv3 = None
        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, X):
        Y = F.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3:
            X = self.conv3(X)
        Y += X
        return F.relu(Y)

In [None]:
def resnet_block(input_channels, num_channels, num_residuals,
                 first_block=False):
    blk = []
    for i in range(num_residuals):
        if i == 0 and not first_block:
            blk.append(Residual(input_channels, num_channels,
                                use_1x1conv=True, strides=2))
        else:
            blk.append(Residual(num_channels, num_channels))
    return blk


b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
                   nn.BatchNorm2d(64), nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))
b3 = nn.Sequential(*resnet_block(64, 128, 2))
b4 = nn.Sequential(*resnet_block(128, 256, 2))
b5 = nn.Sequential(*resnet_block(256, 512, 2))
net = nn.Sequential(b1, b2, b3, b4, b5,
                    nn.AdaptiveAvgPool2d((1,1)),
                    nn.Flatten(), nn.Linear(512, 10))

In [None]:
batch_size = 256
train_loader,veri_loader,test_loader = load_data(batch_size, resize=224)

In [None]:
lr, num_epochs = 0.01,10
train_ch6(net, train_loader, veri_loader, num_epochs, lr, d2l.try_gpu())
# torch.save(model.state_dict(), path)


In [None]:
lr, num_epochs = 0.001,20
train_ch6(net, train_loader, veri_loader, num_epochs, lr, d2l.try_gpu())
# torch.save(model.state_dict(), path)


In [None]:
lr, num_epochs = 0.0001,10
train_ch6(net, train_loader, veri_loader, num_epochs, lr, d2l.try_gpu())
# torch.save(model.state_dict(), path)


In [None]:
f = open('submission.csv', 'w',newline='', encoding='utf-8') 
csv_write = csv.writer(f) 
csv_write.writerow(['Id', 'Category'])

net.eval() 
for images,labels in test_loader:
    images = images.float().to(device)
    outputs = net(images).cpu()
    _, predicted = torch.max(outputs.data, 1)#将可信度最高的编号返回，最终将这个predicted
    category=predicted.detach().numpy()
    print(category)
    csv_write.writerow([labels.detach().numpy(), predicted.detach().numpy()])
f.close()
