Autoencoder의 Latent Space에 kmeans 클러스터링을 적용함

In [None]:
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
import numpy as np
from scipy.optimize import linear_sum_assignment as linear_assignment #
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import matplotlib.cm as cm

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#데이터 불러오기
batch_size = 128
num_clusters = 10
latent_size = 10 #꼭 클러스터 개수와 같을 필요는 없음

In [None]:
trainset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transforms.ToTensor())
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

testset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transforms.ToTensor())
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2)

100%|██████████| 9.91M/9.91M [00:02<00:00, 4.52MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 132kB/s]
100%|██████████| 1.65M/1.65M [00:01<00:00, 1.25MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 9.04MB/s]


In [None]:
class Flatten(torch.nn.Module):
  def forward(self, x):
    batch_size = x.shape[0]
    return x.view(batch_size, -1)

class Deflatten(torch.nn.Module):
  def __init__(self,k):
    super(Deflatten, self).__init__()
    self.k = k
  def forward(self, x):
    s = x.size()
    feature_size = int((s[1]//self.k)**.5)
    return x.view(s[0], self.k, feature_size, feature_size)

class Kmeans(nn.Module):
  def __init__(self, num_clusters, latent_size):
    super(Kmeans, self).__init__()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    self.num_clusters = num_clusters
    self.centroids = nn.Parameter(torch.rand((self.num_clusters, latent_size)).to(device))

  def argminl2distance(self, a, b):
    return torch.argmin(torch.sum((a - b)**2, dim=1), dim=0)

  def forward(self, x):
    y_assign = []
    for m in range (x.size(0)):
      h = x[m].expend(self.num_clusters, -1)
      assign = self.argminl2distance(h, self.centroids)
      y_assign.append(assign.item())
    return y_assign, self.centroids[y_assign]

In [None]:
# ===== 하이퍼파라미터 =====
k = 16            # 베이스 채널
latent_size = 10  # 원하는 잠재 차원 (임의로 조정 가능)

# ===== Encoder / Decoder 분리 정의 =====
class Encoder(nn.Module):
    def __init__(self, latent_size=10, k=16):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, k, 3, stride=2),      # 28 -> 13
            nn.ReLU(inplace=True),
            nn.Conv2d(k, 2*k, 3, stride=2),    # 13 -> 6
            nn.ReLU(inplace=True),
            nn.Conv2d(2*k, 4*k, 3, stride=1),  # 6 -> 4
            nn.ReLU(inplace=True),
            nn.Flatten(),                      # 4*4*4k = 1024 (k=16이면 64*4*4)
        )
        self.to_latent = nn.Sequential(
            nn.Linear(4*k*4*4, latent_size),   # 1024 -> latent_size
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        h = self.features(x)           # (N, 1024)
        z = self.to_latent(h)          # (N, latent_size)
        return z


class Decoder(nn.Module):
    def __init__(self, latent_size=10, k=16):
        super().__init__()
        self.from_latent = nn.Sequential(
            nn.Linear(latent_size, 4*k*4*4),  # latent_size -> 1024
            nn.ReLU(inplace=True),
            nn.Unflatten(1, (4*k, 4, 4)),     # (N, 4k, 4, 4) = (N, 64, 4, 4)
        )
        self.deconv = nn.Sequential(
            nn.ConvTranspose2d(4*k, 2*k, 3, stride=1),                 # 4 -> 6
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(2*k, k, 3, stride=2),                   # 6 -> 13
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(k, 1, 3, stride=2, output_padding=1),   # 13 -> 28
            nn.Sigmoid(),  # 입력을 [0,1]로 맞춰 학습하면 출력도 [0,1]
        )

    def forward(self, z):
        h = self.from_latent(z)    # (N, 4k, 4, 4)
        x_hat = self.deconv(h)     # (N, 1, 28, 28)
        return x_hat

클러스터링 정확도 함수 정의

In [None]:
def cluster_acc(y_true, y_pred):
  y_true = np.array(y_true)
  y_pred = np.array(y_pred)

  D=max(y_pred.max(),y_true.max())+1
  w = np.zeros((D, D), dtype = np.int64)
  for i in range(y_pred.size):
    w[y_pred[i], y_true[i]] += 1

  ind = linear_assignment(w.max() - w)
  return sum([w[i, j] for i, j in zip(ind[0], ind[1])]) * 1.0 / y_pred.size

def evaluation(testloader, encoder, kmeans, device):
  predictions = []
  actual = []

  with torch.no_grad():
    for images, labels in testloader:
      inputs = images.to(device)
      labels = labels.to(device)
      latent_var = encoder(inputs)
      y_pred, _ = kmeans(latent_var)
      predictions += y_pred
      actual += labels.cpu().tolist()

  return cluster_acc(actual, predictions)

In [None]:
#손실함수 및 최적화 방법 정의
encoder = Encoder(latent_size).to(device)
decoder = Decoder(latent_size).to(device)
kmeans = Kmeans(num_clusters, latent_size).to(device)

criterion1 = torch.nn.MSELoss()
criterion2 = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(list(encoder.parameters()) + list(decoder.parameters()) + list(kmeans.parameters()), lr=0.001)

하여튼 이렇게 Autoenoder의 latent space에서 Kmeans를 적용하면 더 나은 성능을 얻을 수 있다. 어쨋든 latent space가 데이터 분포의 중요한 부분을 설명하고 있기 때문
