# US 

In [1]:
from tqdm.notebook import trange, tqdm
import numpy as np
from modAL.models import ActiveLearner
import torch
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset
from skorch import NeuralNetClassifier



## CNN

In [2]:
transform = transforms.Compose([transforms.ToTensor()])
cifar100 = datasets.CIFAR100(root='./data', train=True, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(cifar100, batch_size=1, shuffle=False)

Files already downloaded and verified


In [4]:
def load_filtered_CIFAR(selected_labels, num_train_per_class=200, num_test_per_class=50):
    '''
    Loads CIFAR-100 dataset but filters it to only include specified labels with a limited number of samples.

    :param selected_labels: List of 3 labels to keep
    :param num_train_per_class: Number of samples per label for training (default: 200)
    :param num_test_per_class: Number of samples per label for testing (default: 50)
    :return: Filtered training and test sets -> (X_train, y_train, X_test, y_test)
    '''

    # Load CIFAR-100 dataset
    train_set = datasets.CIFAR100(root='./data', train=True, download=True)
    test_set = datasets.CIFAR100(root='./data', train=False, download=True)

    # Convert to NumPy arrays
    X_train, y_train = train_set.data, np.array(train_set.targets)
    X_test, y_test = test_set.data, np.array(test_set.targets)

    # Function to filter data
    def filter_data(X, y, num_samples_per_class):
        filtered_images = []
        filtered_labels = []
        
        for label in selected_labels:
            indices = np.where(y == label)[0]  # Get indices for the label
            selected_indices = indices[:num_samples_per_class]  # Take only required samples
            
            filtered_images.append(X[selected_indices])
            filtered_labels.append(y[selected_indices])

        # Stack and flatten images
        X_filtered = np.concatenate(filtered_images, axis=0).reshape(-1, 32 * 32 * 3).astype(np.float32)
        y_filtered = np.concatenate(filtered_labels, axis=0)

        return X_filtered, y_filtered

    # Filter training and test sets
    X_train_filtered, y_train_filtered = filter_data(X_train, y_train, num_train_per_class)
    X_test_filtered, y_test_filtered = filter_data(X_test, y_test, num_test_per_class)

    return X_train_filtered, y_train_filtered, X_test_filtered, y_test_filtered

# Select 3 labels (e.g., labels 0, 1, and 2)
selected_labels = [0, 1, 2]
X_train, y_train, X_test, y_test = load_filtered_CIFAR(selected_labels, num_train_per_class=200, num_test_per_class=50)

# Print shapes to verify
print(f"Training data shape: {X_train.shape}, Labels shape: {y_train.shape}")
print(f"Test data shape: {X_test.shape}, Labels shape: {y_test.shape}")

Files already downloaded and verified
Files already downloaded and verified
Training data shape: (600, 3072), Labels shape: (600,)
Test data shape: (150, 3072), Labels shape: (150,)


In [5]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

class SimpleCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3,32,3),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.fc = nn.Sequential(
            nn.Linear(32*15*15,10)
        )
    def forward(self,x):
        x = self.conv(x)
        x = x.view(x.size(0),-1)
        return self.fc(x)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


## Active learning

In [None]:
n_members = [2, 4, 8, 16]
n_repeats = 3
n_queries = 100

# permutations=[np.random.permutation(X_train.shape[0]) for _ in range(n_repeats)]

In [6]:
transform = transforms.Compose([transforms.ToTensor()])
cifar = datasets.CIFAR10(root='./data',train=True,download=True,transform=transform)
indices = np.arange(len(cifar))
np.random.shuffle(indices)
init_idx = indices[:1000]
pool_idx = indices[1000:3000]

def load_subset(dataset,indices):
    subset = Subset(dataset,indices)
    loader = DataLoader(subset,batch_size=len(subset))
    data,labels = next(iter(loader))
    return data.numpy(), labels.numpy()

X_init,y_init = load_subset(cifar,init_idx)
X_pool,y_pool = load_subset(cifar,pool_idx)

net = NeuralNetClassifier(
    SimpleCNN,
    max_epochs=5,
    lr=0.001,
    device='cuda' if torch.cuda.is_available() else 'cpu'
)

learner = ActiveLearner(estimator=net, X_training=X_init, y_training=y_init)
query_idx, _ = learner.query(X_pool)
learner.teach(X_pool[query_idx].reshape(1,*X_pool.shape[1:]), [y_pool[query_idx]])

Files already downloaded and verified
  epoch    train_loss    valid_acc    valid_loss     dur
-------  ------------  -----------  ------------  ------
      1           nan       [32m0.0850[0m           nan  0.5261
      2           nan       0.0850           nan  0.0729
      3           nan       0.0850           nan  0.0691
      4           nan       0.0850           nan  0.0610
      5           nan       0.0850           nan  0.0619


ValueError: the dimensions of the new training data and label mustagree with the training data and labels provided so far

In [None]:

for epoch in tqdm(range(2)):
    for images, labels in trainloader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

correct = 0
total = 0
with torch.no_grad():
    for images, labels in testloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f'Accuracy: {accuracy:.4f}')