In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import numpy as np

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


def prepare_data():
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    train_dataset = datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
    test_dataset = datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)
    return train_dataset, test_dataset

def split_dataset(dataset, initial_labeled_size):
    indices = list(range(len(dataset)))
    labeled_indices = np.random.choice(indices, size=initial_labeled_size, replace=False)
    unlabeled_indices = [i for i in indices if i not in labeled_indices]
    return Subset(dataset, labeled_indices), Subset(dataset, unlabeled_indices)


def train_model(model, data_loader, optimizer, criterion, epochs=10):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss / len(data_loader):.4f}")


def evaluate_model(model, data_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    return accuracy

# Active learning: uncertainty-based sample selection
def get_uncertain_samples(model, data_loader, num_samples, strategy="entropy"):
    model.eval()
    uncertainties = []
    with torch.no_grad():
        for images, indices in data_loader:
            images = images.to(device)
            outputs = model(images)
            probs = F.softmax(outputs, dim=1)
            
            if strategy == "entropy":
                entropy = -torch.sum(probs * torch.log(probs + 1e-10), dim=1)
                uncertainties.extend(zip(entropy.tolist(), indices.tolist()))
            elif strategy == "least_confidence":
                confidence, _ = torch.max(probs, dim=1)
                uncertainties.extend(zip(-confidence.tolist(), indices.tolist()))
            elif strategy == "margin":
                sorted_probs, _ = probs.sort(dim=1, descending=True)
                margin = sorted_probs[:, 0] - sorted_probs[:, 1]
                uncertainties.extend(zip(-margin.tolist(), indices.tolist()))
    
    uncertainties.sort(reverse=True, key=lambda x: x[0])
    return [index for _, index in uncertainties[:num_samples]]


def get_diverse_samples(model, data_loader, num_samples, diversity_metric="cosine_similarity"):
    model.eval()
    features_list, indices = [], []
    with torch.no_grad():
        for images, img_indices in data_loader:
            images = images.to(device)
            outputs = model(images)
            
            # Apply global average pooling instead of full feature map
            features = F.adaptive_avg_pool2d(outputs, (1, 1)).view(outputs.size(0), -1)
            features_list.append(features)
            indices.extend(img_indices)

    features = torch.cat(features_list, dim=0)

    if diversity_metric == "cosine_similarity":
        similarity_matrix = F.cosine_similarity(features.unsqueeze(1), features.unsqueeze(0), dim=2)
        uncertainties = 1 - similarity_matrix.max(dim=1)[0]
    elif diversity_metric == "l2_norm":
        center = features.mean(dim=0)
        distances = torch.norm(features - center, dim=1)
        uncertainties = distances
    elif diversity_metric == "kl_divergence":
        probs = F.softmax(model(features), dim=1)
        entropy = -torch.sum(probs * torch.log(probs + 1e-10), dim=1)
        uncertainties = entropy

    uncertainties, indices = zip(*sorted(zip(uncertainties, indices), reverse=True, key=lambda x: x[0]))
    return list(indices[:num_samples])


if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

 
    train_dataset, test_dataset = prepare_data()
    initial_labeled_size = 1000
    labeled_set, unlabeled_set = split_dataset(train_dataset, initial_labeled_size)

    labeled_loader = DataLoader(labeled_set, batch_size=64, shuffle=True)
    unlabeled_loader = DataLoader(unlabeled_set, batch_size=64, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    # Initialize model, loss, and optimizer
    model = CNN().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train initial model on labeled set
    print("Training on initial labeled dataset...")
    train_model(model, labeled_loader, optimizer, criterion, epochs=10)

    # Evaluate on test set
    accuracy = evaluate_model(model, test_loader)
    print(f"Initial Test Accuracy: {accuracy:.2f}%")

    # Active learning iterations
    num_iterations = 5
    num_samples = 100

    for iteration in range(num_iterations):
        print(f"\nActive Learning Iteration {iteration + 1}")
        
        # Select uncertain samples
        new_indices_uncertainty = get_uncertain_samples(model, unlabeled_loader, num_samples, strategy="entropy")
        
        # Select diverse samples
        new_indices_diversity = get_diverse_samples(model, unlabeled_loader, num_samples, diversity_metric="cosine_similarity")

        # Combine indices and remove duplicates
        new_indices = list(set(new_indices_uncertainty + new_indices_diversity))

        # Convert labeled_set.indices to a list and update subsets
        labeled_set_indices = list(labeled_set.indices)
        labeled_set_indices.extend(new_indices)
        unlabeled_set_indices = [i for i in unlabeled_set.indices if i not in new_indices]

        # Update labeled and unlabeled subsets
        labeled_set = Subset(train_dataset, labeled_set_indices)
        unlabeled_set = Subset(train_dataset, unlabeled_set_indices)

        # Update data loaders
        labeled_loader = DataLoader(labeled_set, batch_size=64, shuffle=True)
        unlabeled_loader = DataLoader(unlabeled_set, batch_size=64, shuffle=False)

        # Retrain the model
        train_model(model, labeled_loader, optimizer, criterion, epochs=5)

        # Evaluate on test set
        accuracy = evaluate_model(model, test_loader)
        print(f"Iteration {iteration + 1} Test Accuracy: {accuracy:.2f}%")


Training on initial labeled dataset...
Epoch 1/10, Loss: 1.5993
Epoch 2/10, Loss: 0.8649
Epoch 3/10, Loss: 0.7129
Epoch 4/10, Loss: 0.6196
Epoch 5/10, Loss: 0.5537
Epoch 6/10, Loss: 0.5157
Epoch 7/10, Loss: 0.4797
Epoch 8/10, Loss: 0.4262
Epoch 9/10, Loss: 0.4246
Epoch 10/10, Loss: 0.4207
Initial Test Accuracy: 80.05%

Active Learning Iteration 1
