In [1]:
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

import os
import torchvision
from torchvision import transforms
from PIL import Image
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler

In [None]:
img = Image.open("./images/0.jpg")
plt.imshow(img)

In [None]:
class LeavesImageDatasets(Dataset):
    def __init__(self, img_labels, transform = None):
        # super().__init__() # TODO:是否必须？
        self.transform = transform

        self.labels_df = pd.read_csv(img_labels)
        if self.labels_df.shape[1] > 1:
            self.is_train = True

            label_mapping = {}
            for i, label in enumerate(self.labels_df["label"].unique()):
                label_mapping[label] = i
            
            self.labels_df["label"] = self.labels_df["label"].map(label_mapping)
            self.labels = torch.tensor(self.labels_df["label"])
        else:
            self.is_train = False
            
    def __len__(self):
        return self.labels_df.shape[0]
        
    def __getitem__(self, index):
        image = Image.open(self.labels_df.iloc[index, 0])
        if self.transform:
            image = self.transform(image)

        if self.is_train:
            return image, self.labels[index]
        else:
            return image

In [None]:
batch_size = 256

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

leavesImageTrainDatasets = LeavesImageDatasets(img_labels="./train.csv", transform=transform)
num_samples = len(leavesImageTrainDatasets)
valid_size = int(0.2 * num_samples)  # 20% 的数据用作验证集

# 创建训练集和验证集的索引
indices = list(range(num_samples))
np.random.shuffle(indices)
valid_indices = indices[:valid_size]
train_indices = indices[valid_size:]

train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(valid_indices)
train_iter = DataLoader(leavesImageTrainDatasets, batch_size=batch_size, num_workers=4, sampler=train_sampler)
valid_iter = DataLoader(leavesImageTrainDatasets, batch_size=batch_size, num_workers=4, sampler=valid_sampler)

leavesImageTestDatasets = LeavesImageDatasets(img_labels="./test.csv", transform=transform)
test_iter = DataLoader(leavesImageTestDatasets, batch_size=256, shuffle=False, num_workers=4)
len(leavesImageTrainDatasets)

In [None]:
net = torchvision.models.resnet18(weights=torchvision.models.ResNet18_Weights.IMAGENET1K_V1)
net.fc = nn.Linear(net.fc.in_features, 176)

In [2]:
help(torchvision.models.resnet18)

Help on function resnet18 in module torchvision.models.resnet:

resnet18(*, weights: Optional[torchvision.models.resnet.ResNet18_Weights] = None, progress: bool = True, **kwargs: Any) -> torchvision.models.resnet.ResNet
    ResNet-18 from `Deep Residual Learning for Image Recognition <https://arxiv.org/pdf/1512.03385.pdf>`__.
    
    Args:
        weights (:class:`~torchvision.models.ResNet18_Weights`, optional): The
            pretrained weights to use. See
            :class:`~torchvision.models.ResNet18_Weights` below for
            more details, and possible values. By default, no pre-trained
            weights are used.
        progress (bool, optional): If True, displays a progress bar of the
            download to stderr. Default is True.
        **kwargs: parameters passed to the ``torchvision.models.resnet.ResNet``
            base class. Please refer to the `source code
            <https://github.com/pytorch/vision/blob/main/torchvision/models/resnet.py>`_
            f

In [5]:
print(torchvision.models.ResNet18_Weights)

<enum 'ResNet18_Weights'>


In [None]:
X = torch.rand((1, 3, 224, 224))
print(net(X).shape)

total_params = 0
for name, param in net.named_parameters():
    print(f"Layer: {name} | Size: {param.size()} | Count: {param.nelement()} | {param.data.dtype}")
    total_params += param.nelement()

print(f"Total parameters count: {total_params}")

In [None]:
def train_ch6(net, train_iter, test_iter, num_epochs, lr, device):
    """Train a model with a GPU (defined in Chapter 6).

    Defined in :numref:`sec_lenet`"""
    def init_weights(m):
        if type(m) == nn.Linear or type(m) == nn.Conv2d:
            nn.init.xavier_uniform_(m.weight)
    net.apply(init_weights)
    print('training on', device)
    net.to(device)

    params_lx = [
        param for name, param in net.named_parameters()
        if name not in ["fc.weight", "fc.bias"]
    ]
    optimizer = torch.optim.Adam([
        { "params" : params_lx }, 
        { 
            "params" : net.fc.parameters(),
            "lr" : lr * 10
        }
    ], lr=lr, weight_decay=0.001)
    
    loss = nn.CrossEntropyLoss()

    timer, num_batches = d2l.Timer(), len(train_iter)
    for epoch in range(num_epochs):
        # Sum of training loss, sum of training accuracy, no. of examples
        metric = d2l.Accumulator(3)
        net.train()
        for i, (X, y) in enumerate(train_iter):
            timer.start()
            optimizer.zero_grad()
            X, y = X.to(device), y.to(device)
            y_hat = net(X)
            l = loss(y_hat, y)
            l.backward()
            optimizer.step()
            with torch.no_grad():
                metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])
            timer.stop()
            train_l = metric[0] / metric[2]
            train_acc = metric[1] / metric[2]
            # if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
            #     animator.add(epoch + (i + 1) / num_batches,
            #                  (train_l, train_acc, None))
        test_acc = d2l.evaluate_accuracy_gpu(net, test_iter)
        # animator.add(epoch + 1, (None, None, test_acc))
        print(f'epoch:{epoch} loss {train_l:.3f}, train acc {train_acc:.3f}, test acc {test_acc:.3f}')
    print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
          f'on {str(device)}')

In [None]:
num_epochs, lr = 5, 5e-5

train_ch6(net, train_iter, valid_iter, num_epochs, lr, d2l.try_gpu())

In [None]:
len(leavesImageTestDatasets)

In [None]:
labels_df = pd.read_csv("train.csv")
text_labels = []
for label in labels_df["label"].unique():
    text_labels.append(label)

def get_leaves_labels(labels):
    return [text_labels[int(i)] for i in labels]

In [None]:
def predict(net, leavesImageTestDatasets, device="cuda"):
    net.eval()
    preds =[]
    for X in leavesImageTestDatasets:
        X = X.to(device)
        X = X.reshape((1, ) + X.shape)
        net = net.to(device)
        preds.append(get_leaves_labels(d2l.argmax(net(X).to("cpu"), axis=1)))

    idx_df = pd.read_csv("test.csv")
    preds = pd.DataFrame(preds, columns=["label"])
    preds = pd.concat((idx_df, preds), axis=1)
    
    preds.to_csv('submission.csv', index=False)

In [None]:
predict(net, leavesImageTestDatasets, device="cuda")