In [None]:
import os
import random
import shutil
import torch
from collections import defaultdict
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
from torchvision.transforms import functional as TF
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [None]:
# Define paths
paths = [
    r"C:\Users\User\Downloads\Hackathon Blossom\flower_data\flower_data\train",
    r"C:\Users\User\Downloads\National Flowers\flowerdataset\train",
    r"C:\Users\User\Downloads\🌸  Flowers\flowers",
    r"C:\Users\User\Downloads\Flowers Recognition\flowers",
    r"C:\Users\User\Downloads\Flowers Datasets (dandelion & daisy)\train",
    r"C:\Users\User\Downloads\Flowers Dataset\train",
    r"C:\Users\User\Downloads\Indian_subcontinent_flowers\labeled_flowers"
]

In [None]:
# Combine all images into a single directory structure
all_images = defaultdict(list)

for path in paths:
    for folder in os.listdir(path):
        folder_path = os.path.join(path, folder)
        if os.path.isdir(folder_path):
            for file in os.listdir(folder_path):
                if file.endswith(('jpg', 'jpeg', 'png')):
                    img_path = os.path.join(folder_path, file)
                    try:
                        # Attempt to open the image file to check if it's valid
                        with Image.open(img_path) as img:
                            img.verify()  # Verify that it is, in fact, an image
                        all_images[folder.lower()].append(img_path)
                    except (IOError, SyntaxError, OSError) as e:
                        print(f"Bad file: {img_path}, deleting...")
                        try:
                            os.remove(img_path)
                        except Exception as delete_error:
                            print(f"Error deleting file {img_path}: {delete_error}")

In [None]:
class AugmentedDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super(AugmentedDataset, self).__init__(root, transform)
        self.max_count = max(len([img for img in os.listdir(os.path.join(root, cls)) if img.endswith(('jpg', 'jpeg', 'png'))]) for cls in self.classes)
        self.image_classes = defaultdict(list)
        for idx, (path, _) in enumerate(self.samples):
            label = self.classes[self.targets[idx]]
            self.image_classes[label].append(path)

    def __getitem__(self, index):
        label = self.classes[index % len(self.classes)]
        images = self.image_classes[label]
        img_path = images[index % len(images)]
        
        # Load the image
        with open(img_path, 'rb') as f:
            img = Image.open(f)
            img = img.convert('RGB')
        
        # Apply data augmentation if necessary
        if len(images) < self.max_count:
            img = self.augment_image(img)
        
        if self.transform is not None:
            img = self.transform(img)
        
        return img, self.class_to_idx[label]
    
    def __len__(self):
        return self.max_count * len(self.classes)
    
    def augment_image(self, img):
        # Define your augmentation here, but do not convert to tensor
        img = img.resize((128, 128), Image.BILINEAR)
        return img

# Define transforms
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(32),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(144),
        transforms.CenterCrop(128),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

#修改以下檔案路徑

In [None]:
# Create the augmented dataset
combined_dataset_path = r"C:\Users\User\Downloads\combined_flower_dataset"  #修改這裡
dataset = AugmentedDataset(combined_dataset_path, transform=data_transforms['train'])

# Split dataset into train, validation, and test sets
train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Apply the correct transform to validation and test datasets
val_dataset.dataset.transform = data_transforms['val']
test_dataset.dataset.transform = data_transforms['val']

# Create data loaders with more workers
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import models


# Define the ResNet Block
class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        residual = self.downsample(x)
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += residual
        out = self.relu(out)
        return out

# Define the ResNet model
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=102):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc1 = nn.Linear(512, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def ResNet50(num_classes):
    return ResNet(ResBlock, [3, 4, 6, 3], num_classes)

model = ResNet50(num_classes=102)

# Load the pretrained ResNet50 model and replace the final layer
pretrained_model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
model.conv1 = pretrained_model.conv1
model.bn1 = pretrained_model.bn1
model.relu = pretrained_model.relu
model.maxpool = pretrained_model.maxpool
model.layer1 = pretrained_model.layer1
model.layer2 = pretrained_model.layer2
model.layer3 = pretrained_model.layer3
model.layer4 = pretrained_model.layer4

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)

class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

early_stopping = EarlyStopping(patience=5, min_delta=0.001)

In [5]:
EPOCHS = 25
train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []

for epoch in range(EPOCHS):
    model.train()
    running_loss, running_corrects = 0.0, 0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)  # 确保数据被迁移到GPU

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        _, preds = torch.max(outputs, 1)
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
    
    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_acc = running_corrects.double() / len(train_loader.dataset)
    train_losses.append(epoch_loss)
    train_accuracies.append(epoch_acc.cpu().numpy())

    model.eval()
    val_loss, val_corrects = 0.0, 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)  # 确保验证数据也被迁移到GPU
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            _, preds = torch.max(outputs, 1)
            val_loss += loss.item() * inputs.size(0)
            val_corrects += torch.sum(preds == labels.data)
    
    epoch_val_loss = val_loss / len(val_loader.dataset)
    epoch_val_acc = val_corrects.double() / len(val_loader.dataset)
    val_losses.append(epoch_val_loss)
    val_accuracies.append(epoch_val_acc.cpu().numpy())

    print(f'Epoch {epoch+1}/{EPOCHS}.. '
          f'Train loss: {epoch_loss:.4f}.. '
          f'Train accuracy: {epoch_acc:.4f}.. '
          f'Val loss: {epoch_val_loss:.4f}.. '
          f'Val accuracy: {epoch_val_acc:.4f}')

    scheduler.step()
    
    early_stopping(epoch_val_loss)
    if early_stopping.early_stop:
        print("Early stopping")
        break

print('Training complete.')

# Save the model
torch.save(model.state_dict(), 'resnet50_flower_model.pth')
print('Model saved to resnet50_flower_model.pth')

In [None]:
import matplotlib.pyplot as plt
# Loss and accuracy visualization
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss over Epochs')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Val Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Accuracy over Epochs')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Evaluate on test set
model.eval()
test_loss, test_corrects = 0.0, 0
all_preds, all_labels = [], []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        _, preds = torch.max(outputs, 1)
        test_loss += loss.item() * inputs.size(0)
        test_corrects += torch.sum(preds == labels.data)

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

test_loss = test_loss / len(test_loader.dataset)
test_accuracy = test_corrects.double() / len(test_loader.dataset)

print(f'Test loss: {test_loss:.4f}.. '
      f'Test accuracy: {test_accuracy:.4f}')

In [None]:
# Confusion matrix
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap='Blues', xticklabels=dataset.classes, yticklabels=dataset.classes)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

# Classification report
print(classification_report(all_labels, all_preds, target_names=dataset.classes))