In [1]:
import os
from google.colab import drive
drive.mount('/content/drive')
os.chdir('/content/drive/MyDrive/Colab Notebooks')

Mounted at /content/drive


In [2]:
import copy
import torch
from torch import nn
from PIL import Image
import torch.optim as optim
from torchvision import models
from torchvision import datasets
import  torch.nn.functional as F
from torchvision import transforms
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader, Dataset, ConcatDataset
from torchvision.models.resnet import ResNet50_Weights

In [3]:
class MyDataset(Dataset):

  def __init__(self, root_dir, label, label_list, transform):
    self.root_dir = root_dir
    self.label = label
    self.path = os.path.join(self.root_dir, self.label)
    self.img_list = os.listdir(self.path)
    self.transform = transform
    self.label_list = label_list


  def __getitem__(self, index):
    img_path = os.path.join(self.path, self.img_list[index])
    img = Image.open(img_path).convert('RGB')
    transform = self.transform
    img = transform(img)
    label =  self.label_list.index(self.label)
    return img, label


  def __len__(self):
    return len(self.img_list)


def split_dataset(dataset_list, ratio):
  train_dataset_list, test_dataset_list = [], []
  for dataset in dataset_list:
    train_size = int(ratio * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset_unit, test_dataset_unit = torch.utils.data.random_split(dataset, [train_size, test_size])
    train_dataset_unit.dataset = copy.copy(dataset)
    train_dataset_unit.dataset.transform = transform_train
    test_dataset_unit.dataset.transform = transform_test
    train_dataset_list.append(train_dataset_unit)
    test_dataset_list.append(test_dataset_unit)

  train_dataset = ConcatDataset(train_dataset_list)
  test_dataset = ConcatDataset(test_dataset_list)

  return train_dataset, test_dataset

In [4]:
# 设置device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# define the data transforms
transform_train = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomCrop((224, 224)),
    transforms.ColorJitter(brightness=0.5, contrast=0.5, hue=0.5),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

# 加载dataset
root_dir = r"./minerals"
label_list = os.listdir(root_dir)
dataset_list = [MyDataset(root_dir, label, label_list, transform_test) for label in label_list]

ratio = 0.8
train_dataset, test_dataset = split_dataset(dataset_list, ratio)

dataset_sizes = {
    "train": train_dataset.__len__(),
    "test": test_dataset.__len__(),
}

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=0)

dataloader = {
    "train": train_loader,
    "test": test_loader
}

In [1]:
# 加载预训练模型
model = models.resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)

# 更改最后一层以适应新数据集的类别数
class_number = len(label_list)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, class_number)

model = model.to(device)

In [6]:
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)
lr = 1e-4
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

In [None]:
# 加载事先训练的模型参数
# weights_path = "./ore_weights/weights_0.8402.pth"
# model.load_state_dict(torch.load(weights_path))
# model = model.to(device)

In [2]:
epoch = 10

for i in range(epoch):
  for phase in ("train", "test"):
    if phase == "train":
      model.train()
    else:
      model.eval()
    
    running_loss = 0.0
    running_corrects = 0
    for step, (inputs, labels) in enumerate(dataloader[phase]):
      inputs, labels = inputs.to(device), labels.to(device)
      optimizer.zero_grad()
      with torch.set_grad_enabled(phase == 'train'):
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)

        if phase == 'train':
          loss.backward()
          optimizer.step()

      running_loss += loss.item()
      # print(preds)
      running_corrects += torch.sum(preds == labels.data)
      if phase == 'train' and step % 10 == 9:
        print(f"[{step + 1}]training loss:{round(running_loss / 10, 4)}")
        running_loss = 0.0

    epoch_acc = running_corrects.double() / dataset_sizes[phase]
    if phase == "test":
      print(f"本轮test acc:{round(epoch_acc.item(), 4)}")
      if i == 0 or epoch_acc > best_acc:
        best_acc = epoch_acc
        best_model_wts = model.state_dict()
        torch.save(best_model_wts, "./ore_weights/weights_{}.pth".format(round(best_acc.item(), 4)))