In [1]:
# Change to your dataset directory
import os
os.chdir('D:/学习/9444dataset/dataset_remade')

In [2]:

import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import torchvision
import torchvision.transforms as transforms
import cv2
import pandas as pd
from sklearn.model_selection import train_test_split
from glob import glob
import fnmatch

In [3]:
# Collect paths to all images
imagePatches = glob('**/*.png', recursive=True)

In [4]:
len(imagePatches)

10384

In [5]:
# Separate image paths based on class
patternZero = '*class0.png'
patternOne = '*class1.png'
classZero = fnmatch.filter(imagePatches, patternZero)
classOne = fnmatch.filter(imagePatches, patternOne)

In [6]:
total_num = 10384
count = 0
y = []
progress_increment = total_num * 0.05  # 5% of total
next_progress = progress_increment

for img in imagePatches:
    if img in classZero:
        y.append(0)
    elif img in classOne:
        y.append(1)

    count += 1

    if count >= next_progress:
        print(f'Progress: {count/total_num*100:.2f}%')  # Prints the progress as a percentage
        next_progress += progress_increment


Progress: 5.01%
Progress: 10.01%
Progress: 15.00%
Progress: 20.00%
Progress: 25.00%
Progress: 30.01%
Progress: 35.01%
Progress: 40.00%
Progress: 45.00%
Progress: 50.00%
Progress: 55.01%
Progress: 60.01%
Progress: 65.00%
Progress: 70.00%
Progress: 75.00%
Progress: 80.01%
Progress: 85.01%
Progress: 90.00%
Progress: 95.00%


In [7]:
# Create dataframe
images_df = pd.DataFrame()
images_df["images"] = imagePatches
images_df["labels"] = y

In [8]:
# Split data into training and validation sets
train, val = train_test_split(images_df, stratify=images_df.labels, test_size=0.2)

In [9]:
# Dataset class
class MyDataset(Dataset):
    def __init__(self, df_data,transform=None):
        super().__init__()
        self.df = df_data.values
        self.transform = transform

    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        img_path,label = self.df[index]
        image = cv2.imread(img_path)
        image = cv2.resize(image, (50, 50))  # Resize images to 50x50 to match the CNN architecture
        if self.transform is not None:
            image = self.transform(image)
        return image, label

In [10]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 32, 3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv4 = nn.Conv2d(64, 64, 3, padding=1)
        self.conv5 = nn.Conv2d(64, 128, 3, padding=1)
        self.fc1 = nn.Linear(128 * 50 * 50, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 2)
        
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = F.dropout(x, 0.3)
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = F.max_pool2d(x, 2)
        x = F.dropout(x, 0.3)
        x = F.relu(self.conv5(x))
        x = x.view(-1, 128 * 50 * 50)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, 0.3)
        x = F.relu(self.fc2(x))
        x = F.dropout(x, 0.3)
        x = self.fc3(x)
        return x

In [11]:
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.delta = delta
        self.val_loss_min = None

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                return True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0
        return False
    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.val_loss_min is not None:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), 'checkpoint.pt')  # <-- saves model
        self.val_loss_min = val_loss

In [12]:
# Model parameters
num_epochs = 40
num_classes = 2
batch_size = 64
learning_rate = 0.0001

In [13]:
# Define transforms
trans_train = transforms.Compose([transforms.ToPILImage(),
                                  transforms.RandomHorizontalFlip(), 
                                  transforms.RandomVerticalFlip(),
                                  transforms.RandomRotation(20), 
                                  transforms.ToTensor(),
                                  transforms.Normalize(mean=[0.5, 0.5, 0.5],std=[0.5, 0.5, 0.5])])

trans_valid = transforms.Compose([transforms.ToPILImage(),
                                  transforms.ToTensor(),
                                  transforms.Normalize(mean=[0.5, 0.5, 0.5],std=[0.5, 0.5, 0.5])])


In [14]:
# Create datasets and dataloaders
dataset_train = MyDataset(df_data=train, transform=trans_train)
dataset_valid = MyDataset(df_data=val,transform=trans_valid)

loader_train = DataLoader(dataset = dataset_train, batch_size=batch_size, shuffle=True, num_workers=0)
loader_valid = DataLoader(dataset = dataset_valid, batch_size=batch_size//2, shuffle=False, num_workers=0)

In [15]:
# Initialize the network
model = Net()

# If a GPU is available, move the model to it
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Define the loss function and the optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Initialize the early_stopping object
early_stopping = EarlyStopping(patience=5, delta=0)



In [16]:
# Training loop
for epoch in range(40):  # loop over the dataset multiple times
    running_loss = 0.0
    for i, data in enumerate(loader_train, 0):
        inputs, labels = data[0].to(device), data[1].to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Print statistics
    print(f"Epoch {epoch + 1}, Loss: {running_loss / len(trainloader)}")

    # early_stopping needs the validation loss to check if it has decreased, 
    # and if it has, it will make a checkpoint of the current model
    if early_stopping(running_loss, model):
        break

print('Finished Training')

RuntimeError: shape '[-1, 320000]' is invalid for input of size 1179648

In [None]:
# Test the model
model.eval()  # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)
confusion_matrix = torch.zeros(2, 2)
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in loader_valid:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        for t, p in zip(labels.view(-1), predicted.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1
                 
    print('Test Accuracy of the model on the test images: {} %'.format(100 * correct / total))


In [None]:
# Save the model checkpoint
torch.save(model.state_dict(), '96_model_10000.ckpt')