# Dataloader

In [None]:
import os
from PIL import Image, UnidentifiedImageError
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from torchvision import transforms
from torchvision.models import resnet18, ResNet18_Weights
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
from tqdm import tqdm


In [None]:
# Settings -> Developer Settings -> Personal Access Tokens -> Token (classic)
os.environ['GITHUB_TOKEN'] = ""

GITHUB_USERNAME = "Codfishz"
REPO_NAME       = "ASR"
TOKEN = os.environ.get("GITHUB_TOKEN")
repo_url        = f"https://{TOKEN}@github.com/{GITHUB_USERNAME}/{REPO_NAME}.git"
!git clone {repo_url}

In [None]:
!cd {REPO_NAME} && git pull

In [None]:
os.chdir('ASR/Script')

In [None]:
%run "Datapreparation_YOLO.ipynb"

In [None]:
# Categories
categories = sorted([d for d in os.listdir(original_base) if os.path.isdir(os.path.join(original_base, d))])

# Set image path and labels
image_paths = []
image_labels = []

base_path = "/content/data"

for category in categories:
    category_dir = os.path.join(base_path, category)
    filenames = [f for f in os.listdir(category_dir) if f.endswith(".jpg")]
    for filename in tqdm(filenames, desc=f"Processing '{category}'"):
      image_path = os.path.join(category_dir, filename)
      try:
          img = Image.open(image_path)
          img = img.convert("RGB")
          img.load()  # load image each time to remove the damaged image thoroughly
          image_paths.append(image_path)
          image_labels.append(category)
      except (UnidentifiedImageError, OSError):
          continue

# Label encoder
label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(image_labels)

# Dataset
class CropDiseaseDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert("RGB")
        if self.transform:
            image = self.transform(image)
        label = self.labels[idx]
        return image, label

# Augmentation
transform = transforms.Compose([
    transforms.Resize((224, 224)), # To fit the pretrain model (说是resnet的官方推荐输入大小)
    # transforms.RandomRotation(20),
    # transforms.ColorJitter(brightness=0.1, contrast=0.1),
    # transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) # To fit the pretrain model (没有很懂啊，加上之后图片群魔乱舞)
])


# Make Dataset and DataLoader
dataset = CropDiseaseDataset(image_paths, encoded_labels, transform=transform)
dataloader = DataLoader(dataset, batch_size=256, shuffle=True, num_workers=0, pin_memory=True)

# Visualization
# plt.figure(figsize=(10, 10))
# for images, labels in dataloader:
#     for i in range(9):
#         img = images[i].permute(1, 2, 0).numpy()
#         plt.subplot(3, 3, i+1)
#         plt.imshow(img)
#         plt.axis('off')
#     break
# plt.show()


In [None]:
print(len(dataset))

# Feature extraction (ResNet18)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pretrained ResNet
weights = ResNet18_Weights.DEFAULT
resnet18 = resnet18(weights=weights)

# Remove the final fully connected layer
feature_extractor = nn.Sequential(*list(resnet18.children())[:-1])
feature_extractor.to(device)
feature_extractor.eval()

# Extract features

def extract_features(dataloader, model, device):
    all_features = []
    all_labels = []

    with torch.no_grad():
        for images, labels in tqdm(dataloader, desc="Extracting features"):
            images = images.to(device)
            features = model(images)                  # shape: [B, 512, 1, 1]
            features = features.view(features.size(0), -1)  # flatten to [B, 512]
            all_features.append(features.cpu())
            all_labels.append(labels)

    return torch.cat(all_features), torch.cat(all_labels)

features, labels = extract_features(dataloader, feature_extractor, device)

In [None]:
# Results
print("Features shape:", features.shape)  # [N, 512]
print("Labels shape:", labels.shape)      # [N]

In [None]:
def select_density_samples_from_features(features, k=10):
    """
    Select the top-k samples based on density computed from pre-extracted features.

    Parameters:
        features: A NumPy array of shape (N, D), where N is the number of images
                  (e.g. 24453) and D is the feature dimension (e.g. 512).
        k: The number of samples to select.

    Returns:
        selected_indices: The indices of the selected samples (top-k with highest density).
        density_scores: A NumPy array of density scores for all samples.
    """
    # Normalize the feature vectors to unit length (to use cosine similarity)
    norms = np.linalg.norm(features, axis=1, keepdims=True) + 1e-8  # avoid division by zero
    norm_features = features / norms

    # Compute the pairwise cosine similarity matrix.
    # Note: Similarity between feature i and j is the dot product of normalized features.
    similarity_matrix = np.dot(norm_features, norm_features.T)

    # Remove self-similarity by setting the diagonal elements to 0.
    np.fill_diagonal(similarity_matrix, 0)

    # Compute density score for each image: here we take the average similarity
    density_scores = similarity_matrix.mean(axis=1)

    # Select the indices of the top-k samples with the highest density scores.
    selected_indices = np.argsort(density_scores)[-k:]

    return selected_indices, density_scores

# ------------------------------------------------------------------------------
# Test Code for Density-Based Sampling
# ------------------------------------------------------------------------------

# Here we assume that 'features' is already defined and has shape [24453, 512].
# For testing purposes, if 'features' is not defined, we'll simulate a smaller set.
try:
    features.shape
except NameError:
    # For demonstration only: simulate a smaller features array (e.g., 50 images).
    features = np.random.rand(50, 512)

# Choose the number of samples you want to select.
k = 5

# Call the density-based sampling function.
selected_indices, density_scores = select_density_samples_from_features(features, k)

print("Selected indices (top {} samples):".format(k), selected_indices)

# Extract the density scores for only the selected samples.
selected_density_scores = density_scores[selected_indices]

print("Density scores for selected samples:", selected_density_scores)

# Plot the density scores for the selected samples only.
plt.figure(figsize=(8, 5))
plt.bar(range(len(selected_density_scores)), selected_density_scores)
plt.xlabel("Selected Sample (order)")
plt.ylabel("Density Score")
plt.title("Density Scores for Selected Samples")
# Optionally, label the x-axis ticks with the actual indices from the dataset.
plt.xticks(range(len(selected_density_scores)), selected_indices)
plt.show()

In [None]:
!pip install ultralytics
!pip install torchinfo

import ultralytics
ultralytics.checks()


# Functions

In [None]:
def setup_active_dataset():

    orig_train_dir = os.path.join(ORIGINAL_DATASET_DIR, TRAIN_SUBDIR)
    active_train_dir = os.path.join(ACTIVE_DATASET_DIR, TRAIN_SUBDIR)
    os.makedirs(active_train_dir, exist_ok=True)

    for cls in os.listdir(orig_train_dir):
        cls_path = os.path.join(orig_train_dir, cls)
        if os.path.isdir(cls_path):
            os.makedirs(os.path.join(active_train_dir, cls), exist_ok=True)
    print(f"Complete creating train_dir {active_train_dir}")

    orig_val_dir = os.path.join(ORIGINAL_DATASET_DIR, VAL_SUBDIR)
    active_val_dir = os.path.join(ACTIVE_DATASET_DIR, VAL_SUBDIR)
    if os.path.exists(orig_val_dir):
        if os.path.exists(active_val_dir):
            shutil.rmtree(active_val_dir)
        shutil.copytree(orig_val_dir, active_val_dir)

    orig_test_dir = os.path.join(ORIGINAL_DATASET_DIR, TEST_SUBDIR)
    active_test_dir = os.path.join(ACTIVE_DATASET_DIR, TEST_SUBDIR)
    if os.path.exists(orig_test_dir):
        if os.path.exists(active_test_dir):
            shutil.rmtree(active_test_dir)
        shutil.copytree(orig_test_dir, active_test_dir)

In [None]:
def get_all_samples():

    samples = []
    orig_train_dir = os.path.join(ORIGINAL_DATASET_DIR, TRAIN_SUBDIR)
    for cls in os.listdir(orig_train_dir):
        cls_path = os.path.join(orig_train_dir, cls)
        if os.path.isdir(cls_path):
            image_files = [f for f in os.listdir(cls_path) if os.path.isfile(os.path.join(cls_path, f))]
            for f in image_files:
                samples.append((cls, f))
    return samples

In [None]:
import shutil
def copy_samples(sample_list):

    for cls, file_name in sample_list:
        src_path = os.path.join(ORIGINAL_DATASET_DIR, TRAIN_SUBDIR, cls, file_name)
        dst_path = os.path.join(ACTIVE_DATASET_DIR, TRAIN_SUBDIR, cls, file_name)
        if not os.path.exists(dst_path):
            shutil.copy(src_path, dst_path)

In [None]:
import random
def stratified_sample(all_samples, n_initial):

    samples_by_class = {}
    for cls, filename in all_samples:
        samples_by_class.setdefault(cls, []).append((cls, filename))

    stratified = []
    for cls, samples in samples_by_class.items():
        stratified.append(random.choice(samples))
    remaining_count = n_initial - len(stratified)
    if remaining_count > 0:
        remaining_samples = list(set(all_samples) - set(stratified))
        additional_samples = random.sample(remaining_samples, remaining_count)
        stratified.extend(additional_samples)

    return stratified



# Hyperparameters

In [None]:
# Hyperparameters
ORIGINAL_DATASET_DIR = '/content/data_yolo'
ACTIVE_DATASET_DIR = '/content/data_active'

TRAIN_SUBDIR = 'train'
VAL_SUBDIR = 'val'
TEST_SUBDIR = 'test'
Num_Train = 20103
N_INITIAL = int(0.3 * Num_Train)
N_PER_PHASE = int(0.1 * Num_Train)
NUM_PHASES = 5

# Density-based selection

In [None]:
from ultralytics import YOLO
import random
import os
import csv

# Initialize dataset
setup_active_dataset()
all_samples = get_all_samples()

# Prepare CSV logging
log_path = "accuracy_log_density.csv"
with open(log_path, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["Phase", "Num_Images", "Top-1 Accuracy", "Top-5 Accuracy"])

# Initial sampling and train
current_sample_list = stratified_sample(all_samples, N_INITIAL)
copy_samples(current_sample_list)

print("Initial Epoch")
model = YOLO('yolo11n-cls.pt')
results = model.train(data=ACTIVE_DATASET_DIR, epochs=1, imgsz=640, name="active-phase0", project="runs/classify")

# Log initial accuracy
acc1 = results.top1
acc5 = results.top5

with open(log_path, "a", newline="") as f:
    writer = csv.writer(f)
    writer.writerow([0, len(current_sample_list), acc1, acc5])

# Compute remaining samples
remaining_samples = list(set(all_samples) - set(current_sample_list))

# Active Learning (Density-Based)
for phase in range(1, NUM_PHASES):
    n_to_add = min(N_PER_PHASE, len(remaining_samples))
    if n_to_add <= 0:
        print("No more samples")
        break

    # Extract subset features
    remaining_indices = [all_samples.index(s) for s in remaining_samples]
    subset_features = features.numpy()[remaining_indices]

    # Density-based to choose top-k
    selected_indices, _ = select_density_samples_from_features(subset_features, k=n_to_add)
    new_samples = [remaining_samples[i] for i in selected_indices]

    # Update sample list, copy images
    copy_samples(new_samples)
    current_sample_list.extend(new_samples)
    remaining_samples = list(set(remaining_samples) - set(new_samples))

    print(f"Phase {phase}: add {n_to_add} samples, total sample number: {len(current_sample_list)}。")

    # Retrain
    model = YOLO(f"runs/classify/active-phase{phase-1}/weights/last.pt")
    results = model.train(data=ACTIVE_DATASET_DIR, epochs=1, imgsz=640, name=f"active-phase{phase}", project="runs/classify")

    # Log accuracy
    acc1 = results.top1
    acc5 = results.top5
    with open(log_path, "a", newline="") as f:
        writer = csv.writer(f)
        writer.writerow([phase, len(current_sample_list), acc1, acc5])

    print(f"Phase {phase} training completed")
