In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.datasets as datasets

from torchvision import transforms

import numpy as np

In [None]:
def mnist_dataset(root, transform):
    # Load Train Data
    train_dataset = datasets.MNIST(
        root=root,
        train=True,
        transform=transform,
        download=True
    )

    # Load Test Data
    test_dataset = datasets.MNIST(
        root=root,
        train=False,
        transform=transform,
        download=True
    )
    return train_dataset, test_dataset

In [None]:
def preprocess_data(train_dataset, test_dataset, batch_size, k, n_classes, seed, shuffle_train=False, return_idx=True):
    # Randomly form unlabeled data in training dataset
    n = len(train_dataset)  # Dataset size
    rand_seed = np.random.RandomState(seed) # Set seed 
    indices = torch.zeros(k)  # Empty tensor for saving indices for keeping labeled data
    unlabel_indices = torch.zeros(n - k)  # Empty tensor for indices of unlabeled data
    quot = k // n_classes 
    temp_index = 0

    for i in range(n_classes):
        class_items = (train_dataset.train_labels == i).nonzero()  # indices of samples with label i
        # train_dataset.train_labels == i : Train Data 중 Label이 i인 것들만 True
        # .nonzero(): Element 값이 0이 아닌 Element들의 Indices만 반환 (Element 값이 0이면 .nonzero() 결과에도 반환되지 않음)

        n_class = len(class_items)  # number of samples with label i
        shuffled = rand_seed.permutation(np.arange(n_class))  # shuffle them  |shuffled| = |n_class|
        indices[i * quot: (i+1) * quot] = torch.squeeze(class_items[shuffled[:quot]]) # |class_items[shuffled[:quot]]| = (alpha, 1)이라서 2차원이라 Squeeze 적용
        unlabel_indices[temp_index: temp_index+n_class-quot] = torch.squeeze(class_items[shuffled[quot:]])
        temp_index += (n_class-quot)

    unlabel_indices = unlabel_indices.long() # tensor as indices must be long, byte or bool
    train_dataset.train_labels[unlabel_indices] = -1 # Unsupervised의 경우 Label = -1 할당

    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               num_workers=0,
                                               shuffle=shuffle_train)

    test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                              batch_size=batch_size,
                                              num_workers=0,
                                              shuffle=False)

    if return_idx:
        return train_loader, test_loader, indices
    return train_loader, test_loader

In [None]:
class GaussianNoise(nn.Module):
    def __init__(self, batch_size, input_shape, std):
        super(GaussianNoise, self).__init__()
        self.shape = (batch_size,) + input_shape # |self.shape| = (batch_size, input_shape's first element, input_shape's second element, input_shape's third element)
        self.std = std
        self.noise = torch.zeros(self.shape).cuda()

    def forward(self, x):
        self.noise.normal_(mean=0, std=self.std) # torch.normal(mean, std): Returns a tensor of random numbers from separate normal distributions
        return x + self.noise

In [None]:
# Labeled Data에 대해서 (Supervised Loss) Cross Entropy 사용
def labeled_ce_loss(out, labels):
    cond = (labels >= 0) # 참고: preprocess_data()에서 Unlabeled Data의 Label = -1 할당함 
    labeled_arr = torch.nonzero(cond) # Array of Labeled Sample Index
    num_sup = len(labeled_arr) # Num of Supervised Samples
    
    # Supervised Instance 수가 0보다 많다면, 즉 존재한다면
    if num_sup > 0:
        labeled_outputs = torch.index_select(input=out, dim=0, index=labeled_arr.view(num_sup)) # labeled_arr.view(num_sup): Flatten() 역할
        labeled_labels = labels[cond] 
        loss = F.cross_entropy(labeled_outputs, labeled_labels)
        return loss, num_sup
    
    # Supervised Instance가 없다면 CE Loss = 0
    loss = torch.tensor([0.], requires_grad=False).cuda()
    return loss, 0 # num_sup == 0 이면 loss와 0(num_sup이 없다는 뜻) 반환

In [None]:
# Unsupervised Loss로 MSE Loss 사용 
def mse_loss(cur_out, ensem_out):
    # Current Output과 Ensemble Output 간의 MSE
    se = torch.sum((F.softmax(cur_out, dim=1) - F.softmax(ensem_out, dim=1)) ** 2)
    
    return se/len(cur_out)

In [None]:
def return_losses(cur_out, ensem_out, w, labels):
    # cur_out: Current output
    # ensem_out: Ensemble output
    # w: Weight for summation loss

    sup_loss, nbsup = labeled_ce_loss(cur_out, labels)
    unsup_loss = mse_loss(cur_out, ensem_out)
    total_loss = sup_loss + w * unsup_loss # 최종 Loss = Supervised Loss + w*Unsupervised Loss

    return total_loss, sup_loss, unsup_loss, nbsup

In [None]:
# Supervised Loss와 Unsupervised Loss를 더할 때 Unsupervised Loss Term 앞에 붙는 Weight를 조정하는 함수
def weight_ramp_up(epoch:int, max_epochs:int, max_val:float, mult, n_labeled:int, n_samples:int):

    max_val = max_val * (n_labeled/n_samples)

    if epoch == 0:
        return 0.
    elif epoch >= max_epochs:
        return max_val

    return max_val * np.exp(-mult * (1. - float(epoch)/max_epochs)**2)

### Modeling

In [None]:
# Base Model로 CNN 사용
class CNN(nn.Module):
    def __init__(self, batch_size:int, std:float, input_shape:tuple = (1,28,28), drop_out:float = 0.5, first_layer:int = 16, second_layer:int = 32):
        super(CNN, self).__init__()
        self.std = std
        self.drop_out = drop_out
        self.first_layer = first_layer
        self.second_layer = second_layer
        self.input_shape = input_shape
        self.conv_block1 = nn.Sequential(nn.Conv2d(1, self.first_layer, 3, stride=1, padding=1),
                                        nn.BatchNorm2d(self.first_layer),
                                        nn.ReLU(),
                                        nn.MaxPool2d(3, stride=2, padding=1))
        self.conv_block2 = nn.Sequential(nn.Conv2d(self.first_layer, self.second_layer, 3, stride=1, padding=1),
                                        nn.BatchNorm2d(self.second_layer),
                                        nn.ReLU(),
                                        nn.MaxPool2d(3, stride=2, padding=1))
        self.drop = nn.Dropout(self.drop_out)
        self.fc = nn.Linear(self.second_layer * 7 * 7, 10)

    def forward(self, x):
        if self.training:
            b = x.size(0)
            noise = GaussianNoise(b, self.input_shape, self.std)
            x = noise(x)

        # first block
        x = self.conv_block1(x)

        # second block
        x = self.conv_block2(x)

        # Classifier (FC)
        x = x.view(-1, self.second_layer * 7 * 7) # Flatten
        x = self.fc(self.drop(x)) # Apply Dropout

        return x

In [None]:
def train(model, 
        train_loader, 
        val_loader,
        k:int, 
        alpha:float, 
        lr:float, 
        num_epochs:int, 
        batch_size:int, 
        n_instances:int, 
        n_classes:int = 10, 
        max_epochs:int = 80, 
        max_val:float = 1.
        ):    
    
    # Feed model to GPU if available
    device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
    model.to(device)

    # Setting Optimizer Adam
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Set First Ensemble Output as zeros
    Z = torch.zeros(n_instances, n_classes).float().to(device)
    z = torch.zeros(n_instances, n_classes).float().to(device)
    outputs = torch.zeros(n_instances, n_classes).float().to(device)

    losses = [] # Total Loss
    sup_losses = [] # Supervised Loss (Cross-Entropy Loss)
    unsup_losses = [] # Unsupervised Loss (MSE Loss)
    best_loss = 10_000

    for epoch in range(num_epochs):
        model.train()

        # Calculate Unsupervised Loss Weight
        w = weight_ramp_up(epoch, max_epochs, max_val, 5, k, 60000)
        w = torch.tensor(w, requires_grad=False).to(device)

        # Targets change only once per Epoch
        for i, (images, labels) in enumerate(train_loader):
            batch_size = images.size(0)  # retrieve batch size again cause drop last is false
            images = images.to(device)
            labels = labels.requires_grad_(False).to(device)

            optimizer.zero_grad()
            out = model(images)

            # 현재 Batch에 해당하는 Ensemble 결과 가져오기
            z_ = z[i*batch_size: (i+1)*batch_size]
            z_.requires_grad_(False)
            loss, sup_loss, unsup_loss, nbsup = return_losses(out, z_, w, labels)

            # Save outputs
            outputs[i*batch_size: (i+1)*batch_size] = out.detach().clone()
            losses.append(loss.item())
            sup_losses.append(nbsup*sup_loss.item())
            unsup_losses.append(unsup_loss.item())

            # Backpropagation
            loss.backward()
            optimizer.step()

        loss_mean = np.mean(losses)
        sup_loss_mean = np.mean(sup_losses)
        unsup_loss_mean = np.mean(unsup_losses)

        # 5 Epoch마다 Total Loss, Supervised Loss, Unsupervised Loss 출력
        if (epoch + 1) % 5 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss_mean:.4f}, Supervised Loss: {sup_loss_mean:.4f}, Unsupervised Loss: {unsup_loss_mean:.4f}')
    
        # Model의 Outputs에 EMA 이용해서 Ensemble Outputs으로 Update
        Z = alpha * Z + (1. - alpha) * outputs
        z = Z * (1. / (1. - alpha ** (epoch + 1)))

        if loss_mean < best_loss:
            best_loss = loss_mean
            
            print('='*10, f'{epoch + 1} Epoch Model is Saved', '='*10)
            torch.save({'state_dict': model.state_dict()}, f'model_best.pth')

In [None]:
def evaluation(model, loader):
    # Evaluation using Best Model
    checkpoint = torch.load('model_best.pth')
    model.load_state_dict(checkpoint['state_dict'])
    model.eval()
    
    correct = 0
    total = 0

    for i, (samples, labels) in enumerate(loader):
        samples = samples.cuda()
        labels = labels.requires_grad_(False).cuda()
        outputs = model(samples)
        _, predicted = torch.max(outputs.detach(), 1)
        total += labels.size(0)
        correct += (predicted == labels.detach().view_as(predicted)).sum()
    
    accuracy = 100 * float(correct) / total
    
    print("="*10, "Evaluation Result", "="*10)
    print(f'Evaluation Result - Accuracy: {accuracy:.2f}')
    return np.round(accuracy,2)

In [None]:
# For normalizing dataset(MNIST)
m = 0.13 # Decided by manually checked MNIST OG Dataset
s = 0.31 # Decided by manually checked MNIST OG Dataset 

# Model
drop = 0.3 # dropout probability
std = 0.15 # std of gaussian noise
first_layer = 16 # channels of the first conv
second_layer = 32 # channels of the second conv

# Optimizer
learning_rate = 0.002
num_epochs = 50
batch_size = 512

# Temporal ensembling vars
alpha = 0.5 # Alpha which is in EMA (Output Ensemble)

# keep k labeled data in training set, others' labels will be deleted to make artificial Unlabeled data 
k = 500 # if k = 500, then within Supervised Dataset there are (500/num of classes) instances for each class label.

In [None]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize(m, s)])
train_dataset, val_dataset = mnist_dataset(root='~/datasets/MNIST', transform=transform)
ntrain = len(train_dataset)

model = CNN(batch_size, std, first_layer=first_layer, second_layer=second_layer)
train_loader, val_loader, indices = preprocess_data(train_dataset, val_dataset, batch_size=batch_size, k=k, n_classes=10, seed=1002, shuffle_train=False)
train(model, train_loader, val_loader, k, alpha, learning_rate, num_epochs, batch_size, ntrain)
_ = evaluation(model, val_loader)

In [None]:
# Experiment: Change alpha
alphas = np.arange(0,1+0.1,0.1) # [0., 0.1, ..., 1.]
alpha_exp_accs = []

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize(m, s)])
train_dataset, val_dataset = mnist_dataset(root='~/datasets/MNIST', transform=transform)
ntrain = len(train_dataset)

for a in alphas:
    model = CNN(batch_size, std, first_layer=first_layer, second_layer=second_layer)
    train_loader, val_loader, indices = preprocess_data(train_dataset, val_dataset, batch_size=batch_size, k=k, n_classes=10, seed=1002, shuffle_train=False)
    train(model, train_loader, val_loader, k, a, learning_rate, num_epochs, batch_size, ntrain)
    alpha_exp_accs.append(evaluation(model, val_loader))

print(alpha_exp_accs)