In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.models import resnet18
from torch.utils.data import DataLoader
import torch.nn as nn

#Load CIFAR-10
transform = transforms.Compose([
    transforms.Resize(224),  #Resize because ResNet expects 224x224
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=128, shuffle=False, num_workers=2)

#Load Pretrained ResNet18
model = resnet18(pretrained=True)
model.fc = nn.Identity()
model.eval()
model.cuda()

#Extract features
features = []
labels = []

with torch.no_grad():
    for inputs, targets in trainloader:
        inputs = inputs.cuda()
        outputs = model(inputs)
        features.append(outputs.cpu())
        labels.append(targets)

features = torch.cat(features)
labels = torch.cat(labels)


Files already downloaded and verified




In [None]:
import numpy as np

N = features.shape[0]
sample_size = int(0.05 * N)  #5% uniform random sample
uniform_indices = np.random.choice(N, sample_size, replace=False)
uniform_sample = features[uniform_indices]


In [None]:
from sklearn.cluster import KMeans

k = 10 #or any k choosed for initial centers
kmeans = KMeans(n_clusters=k, random_state=0).fit(uniform_sample.numpy())
centers = torch.tensor(kmeans.cluster_centers_)


In [None]:
#Compute squared distance to nearest center
all_features = features
diff = all_features.unsqueeze(1) - centers.unsqueeze(0)
dists = torch.norm(diff, dim=2) ** 2  #(N, k)
min_dists, _ = torch.min(dists, dim=1)

#Define the probabilities
epsilon = 1e-6  #small constant
sampling_probs = min_dists + epsilon
sampling_probs /= sampling_probs.sum()
sampling_probs = sampling_probs.numpy()


In [None]:
coreset_size = int(0.01 * N)  #final coreset size, say 1% of dataset
coreset_indices = np.random.choice(N, coreset_size, replace=True, p=sampling_probs)

coreset_features = features[coreset_indices]
coreset_labels = labels[coreset_indices]

#Assign weights
coreset_weights = 1.0 / (sampling_probs[coreset_indices] * coreset_size)


In [None]:
#coreset_indices(from sampling)

coreset_dataset = torch.utils.data.Subset(trainset, coreset_indices)

#New DataLoader for training
coreset_loader = DataLoader(coreset_dataset, batch_size=128, shuffle=True, num_workers=2)


In [None]:
from torchvision.models import resnet18
import torch.nn as nn

model = resnet18(num_classes=10)  #10 classes for CIFAR-10
model.cuda()

#model.load_state_dict(torch.load('some_pretrained_model.pth'))


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [None]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)


In [None]:
num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, targets in coreset_loader:
        inputs, targets = inputs.cuda(), targets.cuda()

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    scheduler.step()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/total:.4f}, Acc: {100.*correct/total:.2f}%")


Epoch 1/50, Loss: 2.3366, Acc: 13.00%
Epoch 2/50, Loss: 2.2783, Acc: 16.80%
Epoch 3/50, Loss: 2.1601, Acc: 21.00%
Epoch 4/50, Loss: 2.0704, Acc: 23.60%
Epoch 5/50, Loss: 1.9050, Acc: 31.40%
Epoch 6/50, Loss: 1.8264, Acc: 36.20%
Epoch 7/50, Loss: 1.7149, Acc: 41.80%
Epoch 8/50, Loss: 1.6110, Acc: 47.20%
Epoch 9/50, Loss: 1.5268, Acc: 50.60%
Epoch 10/50, Loss: 1.3861, Acc: 54.60%
Epoch 11/50, Loss: 1.3033, Acc: 63.60%
Epoch 12/50, Loss: 1.1765, Acc: 65.60%
Epoch 13/50, Loss: 1.0264, Acc: 72.00%
Epoch 14/50, Loss: 0.8985, Acc: 76.80%
Epoch 15/50, Loss: 0.7957, Acc: 80.80%
Epoch 16/50, Loss: 0.6522, Acc: 86.60%
Epoch 17/50, Loss: 0.5416, Acc: 88.00%
Epoch 18/50, Loss: 0.4048, Acc: 94.40%
Epoch 19/50, Loss: 0.3292, Acc: 96.80%
Epoch 20/50, Loss: 0.2602, Acc: 96.80%
Epoch 21/50, Loss: 0.1790, Acc: 98.80%
Epoch 22/50, Loss: 0.1367, Acc: 99.80%
Epoch 23/50, Loss: 0.0823, Acc: 99.80%
Epoch 24/50, Loss: 0.0622, Acc: 100.00%
Epoch 25/50, Loss: 0.0417, Acc: 100.00%
Epoch 26/50, Loss: 0.0329, Acc: 

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.models import resnet18
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from sklearn.cluster import KMeans
import numpy as np
import random

# -------------------------------
# 1. Load CIFAR-10 Dataset
# -------------------------------
transform_train = transforms.Compose([
    transforms.Resize(224),  # Resize to match ResNet input
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

transform_test = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

trainloader_full = DataLoader(trainset, batch_size=128, shuffle=False, num_workers=2)
testloader = DataLoader(testset, batch_size=128, shuffle=False, num_workers=2)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# -------------------------------
# 2. Feature Extraction (ResNet18)
# -------------------------------
print("Extracting features...")

model_feature = resnet18(pretrained=True)
model_feature.fc = nn.Identity()  # remove last layer
model_feature = model_feature.to(device)
model_feature.eval()

features = []
labels = []

with torch.no_grad():
    for inputs, targets in trainloader_full:
        inputs = inputs.to(device)
        outputs = model_feature(inputs)
        features.append(outputs.cpu())
        labels.append(targets)

features = torch.cat(features)  # shape: (50000, 512)
labels = torch.cat(labels)

print(f"Extracted feature shape: {features.shape}")

# -------------------------------
# 3. Lightweight Coreset Construction
# -------------------------------
print("Constructing lightweight coreset...")

N = features.shape[0]
uniform_sample_size = int(0.05 * N)  # 5% uniform sample
coreset_final_size = int(0.01 * N)   # 1% final coreset size

# Step 1: Uniform random sample
uniform_indices = np.random.choice(N, uniform_sample_size, replace=False)
uniform_sample = features[uniform_indices]

# Step 2: Rough clustering
k = 10  # number of clusters
kmeans = KMeans(n_clusters=k, random_state=0).fit(uniform_sample.numpy())
centers = torch.tensor(kmeans.cluster_centers_)

# Step 3: Compute sampling probabilities
diff = features.unsqueeze(1) - centers.unsqueeze(0)  # (N, k, 512)
dists = torch.norm(diff, dim=2) ** 2
min_dists, _ = torch.min(dists, dim=1)  # (N,)

epsilon = 1e-6
sampling_probs = min_dists + epsilon
sampling_probs /= sampling_probs.sum()
sampling_probs = sampling_probs.numpy()

# Step 4: Importance sampling
coreset_indices = np.random.choice(N, coreset_final_size, replace=True, p=sampling_probs)
coreset_features = features[coreset_indices]
coreset_labels = labels[coreset_indices]

print(f"Coreset size: {len(coreset_indices)}")

# -------------------------------
# 4. Coreset DataLoader
# -------------------------------
coreset_dataset = Subset(trainset, coreset_indices)
coreset_loader = DataLoader(coreset_dataset, batch_size=128, shuffle=True, num_workers=2)

# -------------------------------
# 5. Train ResNet18 on Coreset
# -------------------------------
print("Training ResNet18 on coreset...")

model = resnet18(num_classes=10)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)

num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, targets in coreset_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    scheduler.step()
    print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {running_loss/total:.4f}, Acc: {100.*correct/total:.2f}%")

# -------------------------------
# 6. Evaluate on CIFAR-10 Test Set
# -------------------------------
print("Evaluating on CIFAR-10 test set...")

model.eval()
correct = 0
total = 0

with torch.no_grad():
    for inputs, targets in testloader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

print(f"Test Accuracy: {100.*correct/total:.2f}%")


Extracting features...




Extracted feature shape: torch.Size([50000, 512])
Constructing lightweight coreset...
Coreset size: 500
Training ResNet18 on coreset...
Epoch [1/50] Loss: 2.3335, Acc: 13.20%
Epoch [2/50] Loss: 2.2313, Acc: 17.80%
Epoch [3/50] Loss: 2.1276, Acc: 19.00%
Epoch [4/50] Loss: 1.9781, Acc: 30.00%
Epoch [5/50] Loss: 1.8444, Acc: 33.40%
Epoch [6/50] Loss: 1.7631, Acc: 35.60%
Epoch [7/50] Loss: 1.6586, Acc: 43.20%
Epoch [8/50] Loss: 1.5667, Acc: 42.80%
Epoch [9/50] Loss: 1.4684, Acc: 50.20%
Epoch [10/50] Loss: 1.3276, Acc: 56.60%
Epoch [11/50] Loss: 1.2049, Acc: 60.20%
Epoch [12/50] Loss: 1.0756, Acc: 67.60%
Epoch [13/50] Loss: 0.9331, Acc: 70.40%
Epoch [14/50] Loss: 0.8186, Acc: 75.40%
Epoch [15/50] Loss: 0.7135, Acc: 80.60%
Epoch [16/50] Loss: 0.5886, Acc: 86.40%
Epoch [17/50] Loss: 0.5162, Acc: 89.20%
Epoch [18/50] Loss: 0.3679, Acc: 93.80%
Epoch [19/50] Loss: 0.2954, Acc: 97.00%
Epoch [20/50] Loss: 0.2447, Acc: 96.00%
Epoch [21/50] Loss: 0.1591, Acc: 99.60%
Epoch [22/50] Loss: 0.1313, Acc: 