In [1]:
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
import numpy as np
from matplotlib import pyplot as plt
from scipy.optimize import linear_sum_assignment as linear_assignment # 평가 그룹에 맞춰 그룹을 재배치

# CPU/GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'{device} is available.')

cuda:0 is available.


In [2]:
batch_size=128
num_clusters=10 
latent_size=10 # 잠재 변수의 차원 크기 , cluster 수와 같을 필요 X

In [3]:
trainset=torchvision.datasets.MNIST('./data/',download=True, train=True, transform=transforms.ToTensor())
testset=torchvision.datasets.MNIST('./data/',download=True, train=False, transform=transforms.ToTensor())
trainloader=torch.utils.data.DataLoader(trainset,batch_size=batch_size, shuffle=True)
testloader=torch.utils.data.DataLoader(testset,batch_size=batch_size, shuffle=True)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


  0%|          | 0/9912422 [00:00<?, ?it/s]

Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


  0%|          | 0/28881 [00:00<?, ?it/s]

Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


  0%|          | 0/1648877 [00:00<?, ?it/s]

Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


  0%|          | 0/4542 [00:00<?, ?it/s]

Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw



In [4]:
# Convolution 이기에 필요한 과정
# Encoder의 마지막 Convolution 층에서 피쳐맵을 잠재변수 형태로 바꾸기 위함 , 피처맵의 백터화
class Flatten(torch.nn.Module):
  def forward(self,x):
    batch_size=x.shape[0]
    return x.view(batch_size,-1)

# Decoder에서 잠재 변수 형태를 피처맵으로 전환하기 위함
class Deflatten(nn.Module):
  def __init__(self,k):
    super(Deflatten,self).__init__()
    self.k=k
  def forward(self,x):
    s=x.size()
    feature_size=int((s[1]//self.k)**.5)
    return x.view(s[0],self.k,feature_size,feature_size)

In [5]:
from torch.nn.modules.conv import ConvTranspose2d

class Encoder(nn.Module):
  def __init__(self, latent_size):
    super(Encoder,self).__init__()

    k=16
    self.encoder=nn.Sequential(
        nn.Conv2d(1, k, 3, stride=2),
        nn.ReLU(),
        nn.Conv2d(k, 2*k, 3, stride=2),
        nn.ReLU(),
        nn.Conv2d(2*k, 4*k, 3, stride=1),
        nn.ReLU(),
        Flatten(),  # linear 직전 flatten
        nn.Linear(1024, latent_size),
        nn.ReLU()
    )

  def forward(self,x):
      return self.encoder(x)

class Decoder(nn.Module):
  def __init__(self,latent_size):
    super(Decoder,self).__init__()

    k=16
    self.decoder = nn.Sequential(
        nn.Linear(latent_size, 1024),
        nn.ReLU(),
        Deflatten(4*k),
        nn.ConvTranspose2d(4*k, 2*k, 3, stride=1),
        nn.ReLU(),
        nn.ConvTranspose2d(2*k, k, 3, stride=2),
        nn.ReLU(),
        nn.ConvTranspose2d(k,1,3, stride=2,output_padding=1),
        nn.Sigmoid()
    )

  def forward(self,x):
    return self.decoder(x)


In [6]:
# K means clustering
class Kmeans(nn.Module):
  def __init__(self,num_clusters,latent_size):
    super(Kmeans, self).__init__()
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.num_clusters = num_clusters
    self.centroids = nn.Parameter(torch.rand((self.num_clusters,latent_size)).to(device))  # [0,1) 의 tensor를 (클러스터 수, latent 차원 크기) 로 만든 후 GPU에 올림 -> 그 후 nn.Parameter

  # 거리가 최소가 되는 index 반환
  def argminl2distance(self,a,b):
    return torch.argmin(torch.sum((a-b)**2,dim=1),dim=0)

  def forward(self,x):
    y_assign = []
    for m in range(x.size(0)):
      h = x[m].expand(self.num_clusters,-1) # num_clusters 만큼 행으로 확대 
      assign = self.argminl2distance(h,self.centroids) # 거리가 최소가 되는 index 반환
      y_assign.append(assign.item()) # torch -> 숫자로 바꾸어서 list에 추가
    return y_assign, self.centroids[y_assign]


In [7]:
# 클러스터 linear_assignment 이용해 재배치 후 cluster 전체의 정확도 구하기
def cluster_acc(y_true,y_pred):
  y_true=np.array(y_true)
  y_pred=np.array(y_pred)
  D=max(y_pred.max(),y_true.max())+1
  w=np.zeros((D,D), dtype=np.int64)
  for i in range(y_pred.size):
    w[y_pred[i],y_true[i]]+=1
  ind=linear_assignment(w.max()-w)
  return sum([w[i,j] for i,j in zip(ind[0],ind[1])])*1.0/y_pred.size

In [8]:
def evaluation(testloader,encoder,kmeans,device):
  predictions=[]
  actual=[]

  with torch.no_grad():
    for images,labels in testloader:
      inputs = images.to(device)
      labels= labels.to(device)
      latent_var = encoder(inputs)
      y_pred,_=kmeans(latent_var)

      predictions += y_pred
      actual += labels.cpu().tolist()
  
  return cluster_acc(actual,predictions)

In [9]:
# 손실 함수 및 최적화 방법 정의하기
encoder = Encoder(latent_size).to(device)
decoder = Decoder(latent_size).to(device)
kmeans = Kmeans(num_clusters, latent_size).to(device)
criterion1 = torch.nn.MSELoss()
criterion2 = torch.nn.MSELoss()
optimizer = torch.optim.Adam(list(encoder.parameters())+list(decoder.parameters())+list(kmeans.parameters()), lr=1e-3)

In [10]:
# 모델 학습 변수 
# 모델이 λ에 민감하기에 초반에는 작은 λ로 시작해서(λ=lam/(T2-T1)) λ=lam 이 되는 annealing 방식
T1=100
T2=200
lam=1e-3
ls=0.05

In [12]:
# 모델 학습하기
for ep in range(300):
  # epoch 횟수에 따라 λ를 조절
  if(ep>T1) and (ep<T2):
    alpha=lam*(ep-T1)/(T2-T1) 
  elif ep>=T2:
    alpha=lam
  else:
    alpha=lam/(T2-T1)
  
  running_loss=0.0
  for images,_ in trainloader:
    inputs = images.to(device)
    optimizer.zero_grad()
    latent_var = encoder(inputs)
    _,centroids = kmeans(latent_var.detach())
    outputs = decoder(latent_var)

    l_rec = criterion1(inputs,outputs)
    l_clt = criterion2(latent_var,centroids)
    loss = l_rec + alpha*l_clt

    loss.backward()
    optimizer.step()
    running_loss+=loss.item()
  
  avg_loss = running_loss/len(trainloader)

  if(ep%10==0):
    testacc=evaluation(testloader,encoder,kmeans,device)
    print('[%d] Train loss: %.4f, Test Accuracy:%.3f' %(ep,avg_loss,testacc))

  # 기준 loss(hyper parameter) 보다 작게되면 신경망, kmeans의 매개변수들을 save
  if avg_loss < ls:
    ls=avg_loss

    torch.save(encoder.state_dict(),'./models/dkm_en.pth')
    torch.save(decoder.state_dict(),'./models/dkm_de.pth')
    torch.save(kmeans.state_dict(),'./models/dkm_clt.pth')


[0] Train loss: 0.0374, Test Accuracy:0.115
[10] Train loss: 0.0264, Test Accuracy:0.388
[20] Train loss: 0.0245, Test Accuracy:0.532
[30] Train loss: 0.0236, Test Accuracy:0.616
[40] Train loss: 0.0230, Test Accuracy:0.680
[50] Train loss: 0.0226, Test Accuracy:0.712
[60] Train loss: 0.0223, Test Accuracy:0.698
[70] Train loss: 0.0221, Test Accuracy:0.731
[80] Train loss: 0.0219, Test Accuracy:0.724
[90] Train loss: 0.0217, Test Accuracy:0.719
[100] Train loss: 0.0216, Test Accuracy:0.725
[110] Train loss: 0.0215, Test Accuracy:0.721
[120] Train loss: 0.0214, Test Accuracy:0.720
[130] Train loss: 0.0213, Test Accuracy:0.717
[140] Train loss: 0.0213, Test Accuracy:0.721
[150] Train loss: 0.0212, Test Accuracy:0.718
[160] Train loss: 0.0211, Test Accuracy:0.719
[170] Train loss: 0.0211, Test Accuracy:0.723
[180] Train loss: 0.0211, Test Accuracy:0.725
[190] Train loss: 0.0210, Test Accuracy:0.722
[200] Train loss: 0.0210, Test Accuracy:0.729
[210] Train loss: 0.0209, Test Accuracy:0.748