In [None]:
import pandas as pd
from pathlib import Path

In [None]:
# 读取图片文件
import os
# 拷贝模型参数
import copy
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.datasets
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF
import torchvision.models as models
from torchvision.transforms import ToPILImage
from tqdm.auto import tqdm
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts,ExponentialLR
from sklearn.model_selection import train_test_split,KFold
from PIL import Image
import cv2
import albumentations
from albumentations.pytorch.transforms import ToTensorV2
import matplotlib.pyplot as plt
import torchvision.utils as vutils
from mpl_toolkits.axes_grid1 import ImageGrid
# from efficientnet_pytorch import EfficientNet


In [None]:
# !pip install efficientnet_pytorch


In [None]:
train_data = pd.read_csv('/kaggle/input/dog-breed-identification/labels.csv')
labels = sorted(list(set(list(train_data['breed']))))
labels_num = []
for i in range(len(train_data)):
    labels_num.append(labels.index(train_data['breed'][i]))
train_data['number'] = labels_num
train_data.shape


In [None]:
# 创建test的pandas对象
file_names = sorted(os.listdir('/kaggle/input/dog-breed-identification/test'))
file_names = [name[:-4] for name in file_names]
test_data = pd.DataFrame({'id':file_names})
test_data.head()


In [None]:
# transforms_train = albumentations.Compose(
#     [
#         albumentations.Resize(224, 224),
#         albumentations.HorizontalFlip(p=0.5),
#         albumentations.VerticalFlip(p=0.5),
#         albumentations.Rotate(limit=180, p=0.7),
#         albumentations.RandomBrightnessContrast(),
#         albumentations.ShiftScaleRotate(
#             shift_limit=0.25, scale_limit=0.1, rotate_limit=0
#         ),
#         albumentations.Normalize(
#             [0.485, 0.456, 0.406], [0.229, 0.224, 0.225],
#             max_pixel_value=255.0, always_apply=True
#         ),
#         ToTensorV2(p=1.0),
#     ]
# )
# transforms_test = albumentations.Compose(
#         [
#             albumentations.Resize(224, 224),
#             albumentations.Normalize(
#                 [0.485, 0.456, 0.406], [0.229, 0.224, 0.225],
#                 max_pixel_value=255.0, always_apply=True
#             ),
#             ToTensorV2(p=1.0)
#         ]
#     )
transforms_train =transforms.Compose([
#         ToPILImage(),
        transforms.Resize(256),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
transforms_test = transforms.Compose([
#         ToPILImage(),
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])


In [None]:
#自定义数据集,继承Dataset,并且在该过程中就已经将图像进行了转换
class Dog_Breed(Dataset):
    def __init__(self, train_csv, transform = None, test = False):
        super().__init__()
        self.train_csv = train_csv
        self.image_path = list(self.train_csv['id']) #图像所在地址记录
        self.test = test
        if not self.test:
            self.label_nums = list(self.train_csv['number']) #图像的标号记录
        self.transform = transform
    def __getitem__(self, idx):
        '''
        idx : 所需要获取的图像的索引
        return : image， label
        '''
        if self.test:
            image = Image.open(os.path.join("/kaggle/input/dog-breed-identification/test", self.image_path[idx]+".jpg")).convert('RGB')
        else:
            image = Image.open(os.path.join("/kaggle/input/dog-breed-identification/train", self.image_path[idx]+".jpg")).convert('RGB')
        
        if(self.transform != None):
            image = self.transform(image)
        if not self.test:
            label = self.label_nums[idx]
            return image, label
        else:
            return image

    def __len__(self):
        return len(self.image_path)


In [None]:
def get_device():
    return 'cuda' if torch.cuda.is_available() else 'cpu'
device = get_device()
device


In [None]:
# 获取数据集
# 获取各自的数据集
# 将数据集train_data分为训练集和验证集
train, valid = train_test_split(train_data, test_size=0.2, random_state=i)
trainset = Dog_Breed(train, transform = transforms_train)
validset = Dog_Breed(valid, transform = transforms_test)


In [None]:
# 获取各自的数据加载器
train_loader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True, drop_last=False)
valid_loader = torch.utils.data.DataLoader(validset, batch_size=32, shuffle=False, drop_last=False)


In [None]:
dataiter = iter(train_loader)
images, label = next(dataiter)
fig, axes = plt.subplots(nrows=1, ncols=4, figsize=(10, 4))

for i, ax in enumerate(axes):
    image = images[i].numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    image = std * image + mean
    ax.imshow(image)
    ax.set_title(f"Label: {labels[i]}")
    ax.axis('off')

plt.tight_layout()
plt.show()


In [None]:
# 定义模型
class MyResNet50(nn.Module):
    def __init__(self, num_classes=120):
        super(MyResNet50, self).__init__()
        # 加载预训练的 ResNet-50 模型
        self.net = models.resnet50(pretrained=True)
        for param in self.net.parameters():
            param.requires_grad = False
        # 修改最后一层，将输出特征的数量调整为 num_classes
        in_features = self.net.fc.in_features
        # 修改最后一层，将输出特征的数量调整为 num_classes
        self.net.fc = nn.Linear(in_features, num_classes)
        

    def forward(self, x):
        x = self.net(x)

        return x


In [None]:
# 定义efficientNet模型
class EfficientNetCustom(nn.Module):
    def __init__(self, num_classes=120):
        super(EfficientNetCustom, self).__init__()
        self.net = EfficientNet.from_pretrained('efficientnet-b0')  # 可以选择不同的 EfficientNet 版本
        for param in self.net.parameters():
            param.requires_grad = False
        in_features = self.net._fc.in_features
        self.net._fc = nn.Linear(in_features, num_classes)

    def forward(self, x):
        return self.net(x)


In [None]:
def train_model(model,train_loader, valid_loader,loss,optimizer,epoch,device = torch.device("cuda:0")):
#     net = torchvision.models.resnet50(pretrained=True)
#     in_features = net.fc.in_features
#     net.fc = nn.Linear(in_features, 120)
    net = model.to(device)
    best_epoch = 0
    best_score = 0.0
    best_model_state = None
    early_stopping_round = 3
    losses = []

#     scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2, eta_min = 1e-4)
    # 按照指数衰减的方式更新优化器中的学习率
    # gamma 参数指定了学习率衰减的因子。在每个 epoch 结束时，学习率将乘以 gamma
    # verbose=True: 这个参数是可选的。如果设置为 True，则会在每个 epoch 结束时打印学习率的值。
    scheduler = ExponentialLR(optimizer, gamma=0.9,verbose=True)
    for i in range(epoch):
        acc = 0
        loss_sum = 0
        net.train()
        for x, y in tqdm(train_loader):
            optimizer.zero_grad()
            # todo 会不会多此一举
#             x = torch.as_tensor(x, dtype=torch.float)
            x = x.to(device)
            y = y.to(device)
            y_hat = net(x)
            loss_temp = loss(y_hat, y)
            loss_sum += loss_temp
            loss_temp.backward()
            optimizer.step()
            acc += torch.sum(y_hat.argmax(dim=1) == y)
        # 每一个epoch更新一次学习率而不是每一个批次
        scheduler.step()
        losses.append(loss_sum.cpu().detach().numpy() / len(train_loader))
        print( "epoch: ", i, "loss=", loss_sum.item()/(len(train_loader)*train_loader.batch_size), "训练集准确度=",(acc/(len(train_loader)*train_loader.batch_size)).item(),end="")

        #每一个epoch验证一次
        test_acc = 0
        net.eval()
        for x, y in tqdm(valid_loader):
            x = x.to(device)
#             x = torch.as_tensor(x, dtype=torch.float)
            y = y.to(device)
            y_hat = net(x)
            test_acc += torch.sum(y_hat.argmax(dim=1) == y)
        print("验证集准确度", (test_acc / (len(valid_loader)*valid_loader.batch_size)).item())
        if test_acc > best_score:
            # 保存最好的模型
            best_model_state = copy.deepcopy(net.state_dict())
            best_score = test_acc
            best_epoch = i
            print('best epoch save!')
        if i - best_epoch >= early_stopping_round:
            break

    net.load_state_dict(best_model_state)
    
    testset = Dog_Breed(test_data, transform = transforms_test,test = True)
    # todo 测试集加载器为什么在里面，为什么不在外面？而训练集和验证集加载器在外面？
    test_loader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=False, drop_last=False)

    predictions = []
    with torch.no_grad():
        for x in tqdm(test_loader):
            x = x.to(device)
            # x = torch.as_tensor(x, dtype=torch.float)
            y_hat = net(x)
            # predict = torch.argmax(y_hat,dim=1).reshape(-1)
            predict = list(y_hat.cpu().detach().numpy())
            # predict的形状为[64,120]
            predictions.extend(predict)
    predictions = np.array(predictions)
    prediction = torch.tensor(predictions).reshape(-1, 120)
    return prediction


In [None]:
learn_rate = 0.001
momentum = 0.9 
epoch = 10


In [None]:
model = MyResNet50()
# 定义五折交叉验证
kfold = KFold(n_splits=5, shuffle=True, random_state=2021)
for train_index,val_index in kfold.split(train_data):
    train, valid = train_data.iloc[train_index], train_data.iloc[val_index]
    trainset = Dog_Breed(train, transform = transforms_train)
    validset = Dog_Breed(valid, transform = transforms_test)
    train_loader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True, drop_last=False)
    valid_loader = torch.utils.data.DataLoader(validset, batch_size=32, shuffle=False, drop_last=False)
    # model = EfficientNetCustom(num_classes=120)
    # 定义损失函数和优化器
    # 设置为 'mean' 表示计算平均损失值。
    loss = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.net.fc.parameters(), lr=learn_rate,weight_decay=1e-5)
    # optimizer = optim.SGD(model.parameters(), lr=learn_rate,momentum=momentum,weight_decay=1e-5)
    prediction = train_model(model,train_loader, valid_loader,loss,optimizer,epoch,device)
    result = pd.DataFrame(prediction,columns=labels)
    result = pd.concat([test_data,result],axis=1)
    result.to_csv('dog_breed.csv',index=False)


In [None]:
# todo输出层没有修改为概率，导致很多数字都是负数
df = pd.read_csv('/kaggle/working/dog_breed.csv')
data = df.iloc[:,1:].values
tensor_data = torch.tensor(data,dtype=torch.float32)
probability = F.softmax(tensor_data,dim=1)
df.iloc[:,1:] = probability.numpy()
df.to_csv('dog_breed.csv',index=False)
