In [None]:
import numpy as np
from sklearn.cluster import KMeans
from sklearn.neighbors import KNeighborsClassifier
import torchvision.datasets as datasets
from tqdm import tqdm
import os
import timm
import torch
import torch.nn as nn
import torchvision
import torchvision.datasets as datasets
from torchvision import transforms
import torchvision.models as models
from torch.utils.data import DataLoader

In [None]:
def kmean_selection(train_x, train_y, M, seed):
    new_train_x = []
    new_train_y = []
    for i in range(10):
        sub_train_x = train_x[train_y==i]
        kmeans = KMeans(n_clusters = M//10, random_state=seed).fit(sub_train_x)
        new_train_x.extend(kmeans.cluster_centers_)
        new_train_y.extend([i]*(M//10))
    return new_train_x, new_train_y

def load_embeddings(path='data'):
    return np.load(os.path.join(path, "train_x.npy")), np.load(os.path.join(path, "train_y.npy")), np.load(os.path.join(path, "test_x.npy")), np.load(os.path.join(path, "test_y.npy"))

def embedded_pipeline(M, select_func, t, Iteration, seeds=list(range(5)), path="./data"):
    train_X, train_Y, test_X, test_Y = load_embeddings(path)
    accs = []
    for iter, seed in zip(range(Iteration), seeds):
        train_x, train_y, test_x, test_y = np.copy(train_X), np.copy(train_Y), np.copy(test_X), np.copy(test_Y)
        train_x, train_y = select_func(train_x, train_y, M, seed)
        classifier = KNeighborsClassifier(n_neighbors=1, algorithm="brute")
        classifier.fit(train_x, train_y)
        classifier.score(test_x,test_y)
        acc = classifier.score(test_x, test_y)
        accs.append(acc)
        print(f"Iter: {iter}  Acc: {acc}")
        classifier = None
    
    accs = np.array(accs)
    mean = accs.mean()
    std = np.std(accs)
    std_sample = np.std(accs, ddof=1)
    interval = (mean-t*std_sample/np.sqrt(len(accs)),mean+t*std_sample/np.sqrt(len(accs)))
    
    return accs, mean, std, std_sample, interval

In [None]:
resnet = timm.create_model("resnet18", pretrained=False, num_classes=10)
resnet.conv1 = torch.nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
resnet.load_state_dict(
  torch.hub.load_state_dict_from_url(
    "https://huggingface.co/gpcarl123/resnet18_mnist/resolve/main/resnet18_mnist.pth",
    map_location="cpu",
    file_name="resnet18_mnist.pth",
  )
)
resnet.fc=nn.Identity()
preprocessor = torchvision.transforms.Normalize((0.1307,), (0.3081,))
transform = transforms.Compose([transforms.ToTensor()])
trainset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
testset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

trainloader = DataLoader(dataset=trainset, batch_size=64, shuffle=False)
testloader = DataLoader(dataset=trainset, batch_size=64, shuffle=False)

resnet = resnet.to("cuda")

new_x = []
new_y = []
with torch.no_grad():
    for x, y in tqdm(trainloader):
        x = x.to("cuda")
        x = resnet(preprocessor(x))
        x = x.cpu().numpy()
        new_x.extend(x)
        new_y.extend(y)
    
new_x = np.array(new_x)
np.save("data/train_x", new_x)
new_y = np.array(new_y)
np.save("data/train_y", new_y)

new_x = []
new_y = []
with torch.no_grad():
    for x, y in tqdm(testloader):
        x = x.to("cuda")
        x = resnet(x)
        x = x.cpu().numpy()
        new_x.extend(x)
        new_y.extend(y)
    
new_x = np.array(new_x)
np.save("data/test_x", new_x)
new_y = np.array(new_y)
np.save("data/test_y", new_y)

In [None]:
Ms = [10, 100, 1000, 5000, 10000]

for M in Ms:
    print(f"M: {M}")
    accs, mean, std, std_sample, interval = embedded_pipeline(M, select_func=kmean_selection, t=t095, Iteration=Iteration, seeds=[17, 26, 58, 96, 42])

    print(f"Accuracies: {accs}")  
    print("Mean: {:.5f}".format(mean))
    print("Standard Deviation: {:.5}".format(std))
    print("Confidence Interval: ({:.5f}  {:.5f})".format(interval[0], interval[1]))
    print()