1. Data preparation

In [None]:
# 1. Mount Google Drive and unzip the files
from google.colab import drive
import zipfile
import os

# Mount Google Drive
drive.mount('/content/drive')

# Define decompression path and destination path
zip_path = '/content/drive/My Drive/Terrace/image_label.zip'
extract_path = '/content/image'

# unzip the files
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print("File decompression complete！")


In [None]:
# 2. Image standardisation and conversion
from PIL import Image
import torchvision.transforms as transforms
import random
import matplotlib.pyplot as plt

# Define the image folder path and the output folder path 
image_folder_path = '/content/image/'
output_folder_path = '/content/image_label_trans'
os.makedirs(output_folder_path, exist_ok=True)

# Define image standardisation and conversion
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomRotation(10), # Random rotation
    transforms.ColorJitter(brightness=0.1), # Enhancement of brightness
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Get a list of image paths
def get_image_paths(root_folder):
    image_paths = []
    for dirpath, _, filenames in os.walk(root_folder):
        for fname in filenames:
            if fname.endswith('.jpg') or fname.endswith('.png'):
                image_paths.append(os.path.join(dirpath, fname))
    return image_paths

image_paths = get_image_paths(image_folder_path)

# Process and standardise all image counters
processed_count = 0

# Process and standardise all images
for image_path in image_paths:
    # Load image
    image = Image.open(image_path)
    # Process and standardise images
    processed_image = transform(image)
    processed_image = processed_image.float().clamp(min=0, max=1)
    # Calculate the relative path of the save path
    relative_path = os.path.relpath(image_path, image_folder_path)
    save_path = os.path.join(output_folder_path, relative_path)
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    # Save the standardised image to a new directory, keeping the filename unchanged
    transforms.ToPILImage()(processed_image).save(save_path)
    # Update Processing Counter
    processed_count += 1

print(f"A total of {processed_count} photos were processed and saved in the folder: {output_folder_path}")


In [None]:
# 3. Randomly select an image to display
random_image_path = random.choice(image_paths)
random_image = Image.open(random_image_path)
processed_random_image = transform(random_image)
processed_random_image = processed_random_image.float().clamp(min=0, max=1)
processed_random_image_np = processed_random_image.permute(1, 2, 0).numpy()

# Display randomly selected normalised images
plt.imshow(processed_random_image_np)
plt.axis('off')
plt.show()

2. Model training

In [None]:
import numpy as np
import time
import copy
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset, WeightedRandomSampler
from torchvision import datasets, models, transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Step 1. data loading and segmentation
# Load dataset
dataset = datasets.ImageFolder(output_folder_path, transform=transform)
batch_size = 32 

# Calculate the number of samples in each category
class_counts = [0] * len(dataset.classes)
for _, label in dataset.samples:
    class_counts[label] += 1

# Calculate weights for each category
class_weights = [1.0 / count for count in class_counts]

# Create a list of weights for sampling by category
weights = [class_weights[label] for _, label in dataset.samples]

# Delineate training, validation and test sets
train_idx, test_idx = train_test_split(list(range(len(dataset))), test_size=0.2, stratify=dataset.targets)
train_idx, val_idx = train_test_split(train_idx, test_size=0.2, stratify=[dataset.targets[i] for i in train_idx])

# Create weight sampler for training set
sampler = WeightedRandomSampler(weights, len(weights), replacement=True)

# Create the data loader
train_dataset = Subset(dataset, train_idx)
val_dataset = Subset(dataset, val_idx)
test_dataset = Subset(dataset, test_idx)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [None]:
# Step 2: Model construction and modification
from torchvision.models import efficientnet_b3

model = efficientnet_b3(weights='IMAGENET1K_V1')
num_ftrs = model.classifier[-1].in_features 

# Replace the last fully-connected layer and change the output category to 7 classes
model.classifier = nn.Linear(num_ftrs, 7)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)

In [None]:
# Step 3: Define loss function and optimiser (L2 regularisation)
num_epochs = 10 
learning_rate = 0.00001
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)

# Add learning rate scheduler
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

In [None]:
# Step 4: train the model and save the model for each epoch
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10):
    since = time.time()

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            data_loader = train_loader if phase == 'train' else val_loader

            for inputs, labels in data_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)

                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels)

            epoch_loss = running_loss / len(data_loader.dataset)
            epoch_acc = running_corrects.double() / len(data_loader.dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # Save the model for each Epoch
            save_path = f'/content/drive/My Drive/Terrace/model_terrace/model_epoch_{epoch}.pth'
            torch.save(model.state_dict(), save_path)
            print(f"Model saved for epoch {epoch} at {save_path}")

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
        
        # Scheduler update learning rate
        scheduler.step()

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    model.load_state_dict(best_model_wts)
    return model

# Training models
model = train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=num_epochs)

In [None]:
# Step 5: Evaluate the model
from sklearn.metrics import precision_score, recall_score, f1_score

def evaluate_model(model, criterion, data_loader, device):
    model.eval()

    running_loss = 0.0
    running_corrects = 0

    all_labels = []
    all_preds = []

    for inputs, labels in data_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            loss = criterion(outputs, labels)

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels)

        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

    epoch_loss = running_loss / len(data_loader.dataset)
    epoch_acc = running_corrects.double() / len(data_loader.dataset)

    all_labels = np.array(all_labels)
    all_preds = np.array(all_preds)

    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    print('Test Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
    print('Precision: {:.4f} Recall: {:.4f} F1: {:.4f}'.format(precision, recall, f1))

# Evaluate each model
for epoch in range(num_epochs):
    model_path = f"/content/drive/My Drive/Terrace/model_terrace/model_epoch_{epoch}.pth"
    model.load_state_dict(torch.load(model_path))
    print(f"Evaluating model from epoch {epoch}")
    evaluate_model(model, criterion, test_loader, device)


3. Model predictions

In [None]:
# Step 1: Load Image
import zipfile
import os

# Define the decompression function
def extract_zip(zip_path, extracted_folder_base):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extracted_folder_base)

# Recursively traverse the directory to count the number of images in all subfolders
def count_images_in_folder(folder_path):
    count = 0
    for root, _, files in os.walk(folder_path):
        count += len([file for file in files if file.endswith('.jpg')])
    return count

# Unzip the specified zip to the corresponding folder and count the number of images unzipped
zip_file_path = "/content/drive/My Drive/Terrace/image_size.zip"
extracted_folder_base = "/content/images_size/"

os.makedirs(extracted_folder_base, exist_ok=True)

# Unzip the package
extract_zip(zip_file_path, extracted_folder_base)

# Statistics on the number of images unpacked
extracted_count = count_images_in_folder(extracted_folder_base)

print(f"Extracted {extracted_count} images from '{zip_file_path}'.")
print("Image extraction completed.")


In [None]:
# Step 2: Loading the model
import torch
from torchvision import transforms
from PIL import Image
import pandas as pd

# Loading models
model_path = "/content/drive/My Drive/Terrace/model_terrace/model_best.pth" 
model.load_state_dict(torch.load(model_path))
model.eval()
model.to(device)


In [None]:
# Step 3: Model Prediction
import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import pandas as pd

# Define data pre-processing
data_transforms = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.1), 
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Define category index to name mapping
class_idx_to_name = {
    0: 'activities',
    1: 'food',
    2: 'indoors',
    3: 'landscape',
    4: 'posing',
    5: 'species',
    6: 'structures'
}

# Define dataset classes
class CustomDataset(Dataset):
    def __init__(self, image_paths, transform=None):
        self.image_paths = image_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, os.path.basename(img_path)

# Path to the folder where the CSV is saved
csv_save_folder = "/content/drive/My Drive/Terrace/result_csv/"

# Iterate through each subfolder after unpacking, make a prediction and save the result as CSV
for folder in os.listdir(extracted_folder_base):
    folder_path = os.path.join(extracted_folder_base, folder)
    if os.path.isdir(folder_path):
        image_files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith('.jpg')]

        dataset = CustomDataset(image_files, transform=data_transforms)
        data_loader = DataLoader(dataset, batch_size=32, shuffle=False, num_workers=2)

        # Predicted and saved as CSV
        predictions = []
        image_names = []
        for inputs, filenames in data_loader:
            inputs = inputs.to(device)
            with torch.no_grad():
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
            predictions.extend([class_idx_to_name[pred.item()] for pred in preds])
            image_names.extend(filenames)

        results_df = pd.DataFrame({'Image': image_names, 'Category': predictions})
        csv_name = os.path.join(csv_save_folder, f'{folder}_predictions.csv')
        results_df.to_csv(csv_name, index=False)

        print(f"Saved predictions for folder '{folder}' to {csv_name}")

print("Prediction and CSV saving complete.")