In [None]:
import os
import time
import random
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.models as models
from albumentations import (
    Compose, RandomCrop, HorizontalFlip, CenterCrop, Normalize,
)
from albumentations.pytorch.transforms import ToTensorV2
from sklearn.model_selection import train_test_split
import kagglehub

# Set style for plots
sns.set_style('darkgrid')
plt.style.use('fivethirtyeight')

# Set seed for reproducibility
SEED = 42
random.seed(SEED)
os.environ["PYTHONHASHSEED"] = str(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# Download Stanford Dogs Dataset using kagglehub
dataset_path = kagglehub.dataset_download("jessicali9530/stanford-dogs-dataset")

# Define paths for images and annotations
images_dir = os.path.join(dataset_path, 'images/Images')
annotations_dir = os.path.join(dataset_path, 'annotations/Annotation')

# Verify data integrity
if len(os.listdir(annotations_dir)) == len(os.listdir(images_dir)):
    print('Number of annotation folders matches the number of image folders.')
else:
    print('Data mismatch: Annotations and image folders count do not match.')

# Analyze image distribution
breed_counts = []
for breed_folder in os.listdir(images_dir):
    breed_name = breed_folder.split('-')[1]
    image_count = len(os.listdir(os.path.join(images_dir, breed_folder)))
    breed_counts.append((breed_name, image_count))

df_breed_counts = pd.DataFrame(breed_counts, columns=['Breed', 'Image Count'])
total_images = df_breed_counts['Image Count'].sum()
print(f'Total number of images: {total_images}')

plt.figure(figsize=(15, 8))
plt.title('Image Count per Breed (Top 30)')
sns.barplot(x='Image Count', y='Breed',
            data=df_breed_counts.sort_values('Image Count', ascending=False).head(30))
plt.show()



# Create dataframe for training
image_data = []
for breed_folder in os.listdir(images_dir):
    for image_file in os.listdir(os.path.join(images_dir, breed_folder)):
        image_path = os.path.join(breed_folder, image_file)
        breed_name = breed_folder.split('-')[1]
        image_data.append((image_path, breed_name))

df_image_data = pd.DataFrame(image_data, columns=['Image Path', 'Breed'])


# Augmentation functions
def create_train_transforms():
    return Compose([
        RandomCrop(height=299, width=299, p=1.0),
        HorizontalFlip(p=0.5),
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ], p=1)


def create_val_transforms():
    return Compose([
        Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2(),
    ], p=1)



def pca_color_augmentation(image, data_type='Tensor'):
    """Applies PCA color augmentation to the input image."""
    if isinstance(image, torch.Tensor):
        image = image.numpy()
        image = np.transpose(image, (1, 2, 0))

    img_reshaped = image.reshape(-1, 3).astype(np.float32)
    mean, std = np.mean(img_reshaped, axis=0), np.std(img_reshaped, axis=0)
    img_rescaled = (img_reshaped - mean) / std
    cov_matrix = np.cov(img_rescaled, rowvar=False)
    eigen_vals, eigen_vecs = np.linalg.eig(cov_matrix)
    alphas = np.random.normal(loc=0, scale=0.1, size=3)
    delta = np.dot(eigen_vecs, alphas * eigen_vals)
    pca_aug_image = img_rescaled + delta
    pca_aug_image = pca_aug_image * std + mean
    aug_image = np.clip(pca_aug_image, 0, 255).astype(np.uint8)

    if data_type == 'Tensor':
        return torch.from_numpy(np.transpose(aug_image.reshape(image.shape), (2, 0, 1)))
    else:
        return aug_image.reshape(image.shape)


# Dataset class
class DogBreedDataset(Dataset):
    def __init__(self, image_df, labels_df, transform, mode='train'):
        self.image_df = image_df
        self.labels_df = labels_df
        self.transform = transform
        self.mode = mode

    def __len__(self):
        return len(self.image_df)

    def __getitem__(self, idx):
        image_path = os.path.join(images_dir, self.image_df['Image Path'][idx])
        label = self.labels_df.iloc[idx].values

        try:
            image = cv2.imread(image_path)
            if image is None:
                raise FileNotFoundError(f"Image not found at {image_path}")

            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            # Resize while maintaining aspect ratio, with shorter side = 299
            h, w = image.shape[:2]
            if h < w:
                new_h = 299
                new_w = int(w * (299 / h))
            else:
                new_w = 299
                new_h = int(h * (299 / w))

            image = cv2.resize(image, (new_w, new_h))


            image = CenterCrop(height=299, width=299, p=1)(image=image)['image']

            if self.mode == 'train':
                if random.random() < 0.5:
                    image = pca_color_augmentation(image, data_type='numpy')

            augmented = self.transform(image=image)
            image = augmented['image']

            return image, label

        except Exception as e:
            print(f"Error loading image {image_path}: {e}")
            return None, None


# One-hot encode labels
labels_df = pd.get_dummies(df_image_data['Breed'])


# Split data
X_train, X_val, y_train, y_val = train_test_split(
    df_image_data, labels_df, test_size=0.25, random_state=SEED, stratify=df_image_data['Breed']
)

X_train = X_train.reset_index(drop=True)
X_val = X_val.reset_index(drop=True)
y_train = y_train.reset_index(drop=True)
y_val = y_val.reset_index(drop=True)

# Create data loaders
train_transform = create_train_transforms()
val_transform = create_val_transforms()
BATCH_SIZE = 128

train_dataset = DogBreedDataset(X_train, y_train, train_transform, mode = 'train')
val_dataset = DogBreedDataset(X_val, y_val, val_transform, mode = 'val')

# Filter out None values from dataset (due to image loading errors)
train_dataset_filtered = [(img, lbl) for img, lbl in train_dataset if img is not None]
val_dataset_filtered = [(img, lbl) for img, lbl in val_dataset if img is not None]


train_loader = DataLoader(train_dataset_filtered, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset_filtered, batch_size=BATCH_SIZE, shuffle =False)




# Visualization of augmentations (similar to the original code)
# ... (Code for visualization remains same)



# Model setup and training (using Inception v3)

# Device configuration
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load pre-trained model
inception = models.inception_v3(pretrained=True, aux_logits=True).to(device)

# Freeze pre-trained layers
for param in inception.parameters():
    param.requires_grad = False

# Modify the classifier
fc_inputs = inception.fc.in_features
inception.fc = nn.Sequential(
    nn.Linear(fc_inputs, 2048),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(2048, len(labels_df.columns)), # Output layer size should match the number of breeds
    nn.LogSoftmax(dim=1)  # Use LogSoftmax for NLLLoss
).to(device)




# Loss and optimizer
criterion = nn.NLLLoss()
optimizer = optim.Adam(inception.parameters())


# Training loop (simplified)

EPOCHS = 4 # Reduced for demonstration
best_val_loss = float('inf')


for epoch in range(EPOCHS):
    start_time = time.time()
    print(f"Epoch {epoch + 1}/{EPOCHS}")

    inception.train()  # Set model to training mode
    train_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):

        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = inception(inputs)
        main_output = outputs.logits if hasattr(outputs, 'logits') else outputs  # Handle both old and new Inception versions
        loss = criterion(main_output, torch.argmax(labels, dim=1)) # Use argmax to get class indices from one-hot labels
        loss.backward()
        optimizer.step()

        train_loss += loss.item()


    inception.eval()  # Set model to evaluation mode
    val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = inception(inputs)
            main_output = outputs.logits if hasattr(outputs, 'logits') else outputs
            loss = criterion(main_output, torch.argmax(labels, dim=1))
            val_loss += loss.item()




    avg_train_loss = train_loss / len(train_loader)
    avg_val_loss = val_loss / len(val_loader)



    end_time = time.time()
    print(f"Epoch {epoch + 1}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Time: {end_time - start_time:.4f}s")

    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss



torch.save(inception.state_dict(), 'inception_finetuning.pth')



In [None]:
from google.colab import drive
drive.mount('/content/drive')
image_path = '/content/drive/MyDrive/Project2/mixed_breeds'

valid = []

for element in os.listdir(image_path):
    # 确保是目录而不是文件
    if os.path.isdir(os.path.join(image_path, element)):
        breed = element.split('-')[1]
        # 过滤掉隐藏文件和非图像文件
        images = [img for img in os.listdir(os.path.join(image_path, element)) if not img.startswith('._') and img.endswith(('.jpeg', '.jpg', '.png'))]
        valid.append((breed, len(images)))

df_test_breeds = pd.DataFrame(valid, columns=['Breeds', 'Number of images'])
print('Total number of images :', df_test_breeds['Number of images'].sum())

In [None]:
for element in os.listdir(image_path):
    if os.path.isdir(os.path.join(image_path, element)):
        img_files = [img for img in os.listdir(os.path.join(image_path, element)) if img.endswith(('.jpeg', '.jpg', '.png'))]
        print(f"{element}: {len(img_files)} images")
valid = []
for element in os.listdir(image_path):
    if os.path.isdir(os.path.join(image_path, element)):
        img_files = [img for img in os.listdir(os.path.join(image_path, element))
                     if not img.startswith('._') and img.endswith(('.jpeg', '.jpg', '.png'))]
        valid.extend([(os.path.join(element, img), element.split('-')[1]) for img in img_files])
df_test = pd.DataFrame(valid, columns=['Path', 'Label'])
print(df_test.head(10))  # 打印前10行
print('Total number of valid images:', len(valid))

In [None]:
df_test = df_test.join(df_annot, lsuffix='1')
df_test = df_test[['Path', 'Label', 'parent_1', 'parent_2']]
df_test.head()
df_test = df_test.replace('shih-tzu', 'shih')
df_test = df_test.replace('german_short-haired_pointer', 'german_short')


In [None]:
# use same labels as full standford dog dataset to allow matching parent breed during model testing
d1 = pd.DataFrame(0, index=np.arange(len(df_test)), columns=labels.columns)
d1.columns= d1.columns.str.lower()
d2 = pd.DataFrame(0, index=np.arange(len(df_test)), columns=labels.columns)
d2.columns= d2.columns.str.lower()
parent1_labels = pd.get_dummies(df_test['parent_1'])
parent2_labels = pd.get_dummies(df_test['parent_2'])
d1 = d1.add(parent1_labels).fillna(0)
d2 = d2.add(parent2_labels).fillna(0)
test_labels = d1.add(d2)
idx2breed = dict(enumerate(test_labels.columns))

In [None]:
test_loader = loader(df_test, test_labels, batch_size = BATCH_SIZE, obj = 'test')
# Load the saved model
PATH = '/content/drive/MyDrive/inception_finetuningphr.pth'
inception = models.inception_v3(pretrained = True, aux_logits=True)
# Freeze model parameters
for param in inception.parameters():
    param.requires_grad = False
# Change the final layer of Inception Model for Transfer Learning
fc_inputs = inception.fc.in_features
inception.fc = nn.Sequential(
    nn.Linear(fc_inputs, 2048),
    nn.ReLU(),
    nn.Dropout(0.4),
    nn.Linear(2048, 120),
    nn.LogSoftmax(dim=1) # For using NLLLoss()
)
inception.load_state_dict(torch.load(PATH, map_location=torch.device('cpu')))
inception.eval()


In [None]:
test_data_size = len(df_test)
with torch.no_grad():
    both_parents_acc = 0.0
    at_least_1_parent_acc = 0.0
    for j, (inputs, labels) in enumerate(test_loader):
        parents_true = np.argpartition(labels, -2)[:,-2:].detach().numpy()
        outputs = inception(inputs)
        parents_pred = np.argpartition(outputs.detach().numpy(), -2)[:,-2:]

        for pred, true in zip(parents_pred, parents_true):
            both_parents_acc += len(np.intersect1d(pred, true))/2

        for pred, true in zip(parents_pred, parents_true):
            at_least_1_parent_acc += float(np.any(np.in1d(pred,true)))

both_parents_acc = both_parents_acc/float(test_data_size)
at_least_1_parent_acc = at_least_1_parent_acc/float(test_data_size)

print('avg accuracy in detecting both parents: ', both_parents_acc, '\navg accuracy in detecting at least one parent:', at_least_1_parent_acc)

In [None]:

def topKone(loader, data_size, k):
  at_least_one = 0.0

    for j, (inputs, labels) in enumerate(loader):
        parents_true = np.argpartition(labels, -2)[:,-2:].detach().numpy()
        outputs = inception(inputs)
        parents_pred = np.argpartition(outputs.detach().numpy(), -k)[:,-k:]

        counts = np.array([sum(label in pred for label in true) for pred, true in zip(parents_pred, parents_true)])
        at_least_one += float(sum(counts == 1))
    return at_least_one / float(data_size)
def topKtwo(loader, data_size, k):

    both_parents = 0.0
    for j, (inputs, labels) in enumerate(loader):
        parents_true = np.argpartition(labels, -2)[:,-2:].detach().numpy()
        outputs = inception(inputs)
        parents_pred = np.argpartition(outputs.detach().numpy(), -k)[:,-k:]

        counts = np.array([sum(label in pred for label in true) for pred, true in zip(parents_pred, parents_true)])
        both_parents += float(sum(counts == 2))
    return both_parents / float(data_size)

In [None]:
k_values = [5, 10, 20, 30, 40, 50]
topKone_acc = [topKone(test_loader, test_data_size, k=k) for k in k_values]
topKtwo_acc = [topKtwo(test_loader, test_data_size, k=k) for k in k_values]
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15,5))

ax[0].plot(k_values, topKone_acc)
ax[0].set_title('One Parent Accuracy')
ax[0].set_xlabel('K values')
ax[0].set_ylabel('Accuracy')
ax[1].plot(k_values, topKtwo_acc)
ax[1].set_title('Two Parents Accuracy')
ax[1].set_xlabel('K values')
ax[1].set_ylabel('Accuracy')

plt.tight_layout()
plt.show()