#### 이 실습은 [링크](https://keras.io/examples/vision/metric_learning/)의 자료를 참고하여 구성하였습니다.

### 필요한 라이브러리를 불러옵니다. 

In [None]:
import random
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from collections import defaultdict
from PIL import Image
from sklearn.metrics import ConfusionMatrixDisplay
from tensorflow import keras
from tensorflow.keras import layers

### 본 실기에서는 TensorFlow에서 제공하는 데이터셋 중 CIFAR-10 데이터셋을 사용합니다. 

### CIFAR-10은 32 x 32 크기의 6만 개의 이미지로 이루어진 데이터셋으로, 총 10개의 클래스로 분류됩니다.

### 10개의 클래스는 airplane, bird, cat 등등입니다. 

### 데이터셋을 불러옵니다.

In [None]:
'''데이터셋을 load 하고, train 데이터셋과 test 데이터셋으로 split 합니다.'''
from tensorflow.keras.datasets import cifar10

(x_train, y_train), (x_test, y_test) = cifar10.load_data()

x_train = x_train.astype("float32") / 255.0    # normalize
y_train = np.squeeze(y_train)
x_test = x_test.astype("float32") / 255.0
y_test = np.squeeze(y_test)

print('학습 데이터 수 = %s\n테스트 데이터 수 = %s' %(len(x_train), len(x_test)))

### 샘플 데이터를 확인해봅시다. 

In [None]:
height_width = 32

'''샘플 데이터를 plot 하는 함수를 정의합니다.'''
def show_collage(examples):
    box_size = height_width + 2
    num_rows, num_cols = examples.shape[:2]

    collage = Image.new(
        mode="RGB",
        size=(num_cols * box_size, num_rows * box_size),
        color=(250, 250, 250),
    )
    for row_idx in range(num_rows):
        for col_idx in range(num_cols):
            array = (np.array(examples[row_idx, col_idx]) * 255).astype(np.uint8)
            collage.paste(
                Image.fromarray(array), (col_idx * box_size, row_idx * box_size)
            )

    collage = collage.resize((2 * num_cols * box_size, 2 * num_rows * box_size))
    return collage


'''5x5 그리드로 샘플 이미지를 그립니다.'''
sample_idxs = np.random.randint(0, 50000, size=(5, 5))
examples = x_train[sample_idxs]
show_collage(examples)

### 먼저 학습 및 테스트 데이터셋을 클래스 인덱스를 key값으로 하고 샘플 인덱스를 value값의 형태로 재배열하겠습니다. Anchor와 positive sample로 이루어진 데이터셋을 구성하는 데에 유용하게 쓸 것입니다.

In [None]:
class_idx_to_train_idxs = defaultdict(list)
for y_train_idx, y in enumerate(y_train):
    class_idx_to_train_idxs[y].append(y_train_idx)

class_idx_to_test_idxs = defaultdict(list)
for y_test_idx, y in enumerate(y_test):
    class_idx_to_test_idxs[y].append(y_test_idx)
    
print('클래스 인덱스를 key값으로 가집니다: ',class_idx_to_train_idxs.keys())
print('클래스 인덱스 0에 해당하는 샘플들의 인덱스(처음 10개)입니다: ',class_idx_to_train_idxs[0][:10])

### 본 실습에서는 metric learning의 학습을 간단히 구현하기 위해, batch를 10개의 클래스들에 대한 anchor와 positive 쌍으로 구성합니다. 

### 학습의 목적은 같은 클래스에 속하는 anchor와 positive 쌍은 가깝게 embedding되도록 하고 다른 클래스에 속하는 anchor와 positive 쌍은 멀어지도록 embedding하는 것입니다.

### 즉, batch를 클래스들에 대한 anchor와 positive 쌍으로 구성하여, 각각의 anchor에 대해 같은 클래스에 대응되는 positive 샘플과는 서로 가까워지도록 하고, 다른 클래스에 대응되는 positive 샘플과는 멀어지도록 합니다.

### 아래에서 좀 더 자세히 설명하겠습니다. 

In [None]:
num_classes = 10    # CIFAR-10의 클래스 수는 10입니다. 

class AnchorPositivePairs(keras.utils.Sequence):
    def __init__(self, num_batchs):
        self.num_batchs = num_batchs

    def __len__(self):
        return self.num_batchs

    def __getitem__(self, _idx):
        x = np.empty((2, num_classes, height_width, height_width, 3), dtype=np.float32)  # Anchor-positive 쌍으로 데이터셋을 구성하기 위해 처음 dimension이 2입니다.
        for class_idx in range(num_classes):
            examples_for_class = class_idx_to_train_idxs[class_idx]  # 특정 클래스 인덱스에 대한 샘플 인덱스 집합입니다.
            anchor_idx = random.choice(examples_for_class)  # 샘플 인덱스 집합으로부터 무작위로 anchor를 위한 인덱스를 선택합니다.
            positive_idx = random.choice(examples_for_class)  # 샘플 인덱스 집합으로부터 무작위로 positive를 위한 인덱스를 선택합니다.
            while positive_idx == anchor_idx:  # anchor와 positive는 같은 샘플일 수 없으므로, 동일 인덱스의 경우 positive를 다시 선택합니다.
                positive_idx = random.choice(examples_for_class)
            x[0, class_idx] = x_train[anchor_idx]  # anchor 이미지를 특정 클래스 인덱스 값에 저장합니다.
            x[1, class_idx] = x_train[positive_idx]  # positive 이미지를 특정 클래스 인덱스 값에 저장합니다.
        return x

### 클래스들에 대해 anchor와 positive 쌍으로 이루어진 데이터셋을 구성하였습니다.

### 하나의 batch에 대한 예시를 살펴보겠습니다. 

In [None]:
examples = next(iter(AnchorPositivePairs(num_batchs=1)))

show_collage(examples)

### 첫 번째 행은 클래스들에 대한 anchor의 이미지를, 두 번째 행은 클래스들에 대한 positive의 이미지를 나타냅니다.
### 클래스가 위 아래로 잘 묶여 있는 것을 볼 수 있습니다. 

### 이제 metric learning을 위한 Embedding Model을 정의할 차례입니다. 
### 일반적인 분류(classification) 과제와 차이가 있어 커스텀 모델을 정의합니다. 

### 이 커스텀 모델은 anchor와 positive를 embedding한 후 그 내적 값을 softmax에 대한 logits로 사용합니다.
### 즉, 하나의 batch는 anchor/positive 쌍 10개(클래스에 해당하는)로 이루어져 있는데, embedding vector의 dimension이 8이라고 하면, anchor에 대해 10x8의 representation 행렬을 얻고, positive에 대해서도 10x8의 representation 행렬을 얻습니다. 

### Anchor와 positive 두 embedding 행렬을 곱하면 10x10의 유사도 행렬을 구할 수 있는데(embedding dimension에 대한 내적이므로), 이 유사도 행렬의 첫 번째 행이 뜻하는 것은 클래스가 0인 anchor와 클래스 0~9까지의 positive들과의 유사도입니다. 

### 이것을 softmax의 logits라 하면, 즉 유사도 행렬의 첫 번째 행에 대응하는 sparse 라벨을 0으로 하면 해당 anchor는 클래스 0인 positive와의 유사도는 높이고 다른 클래스의 positive들과는 유사도를 낮추는 방향으로 학습됩니다. (코사인 유사도의 경우 1에 가까울수록 유사도가 높다고 판단할 수 있습니다.)

### 다른 클래스에 대해서도 마찬가지로 작동합니다. anchor들은 자신과 동일한 클래스에 속하는 positive와는 가깝게 embedding되고 다른 클래스에 속하는 positive들과는 분리되도록 embedding되므로, 결과적으로 같은 클래스에 속하는 샘플들의 embedding은 가깝게 됩니다. 

In [None]:
class EmbeddingModel(keras.Model):
    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        anchors, positives = data[0], data[1]   # AnchorPositivePairs에서 0차원의 인덱스 0이 anchor, 인덱스 1이 positive였습니다. 

        with tf.GradientTape() as tape:
            # anchor와 positive를 모델에 태워 embedding vector를 계산합니다. 
            anchor_embeddings = self(anchors, training=True)
            positive_embeddings = self(positives, training=True)

            # anchor와 positive 사이의 코사인 유사도(cosine similarity)를 계산합니다. 
            # embedding 될 때 normalize되므로 코사인 유사도가 됩니다. 
            similarities = tf.einsum(
                "ae,pe->ap", anchor_embeddings, positive_embeddings
            )

            # 유사도 값을 logits로 사용하므로 학습 효율성을 위해 값을 조정합니다.
            similarities /= 0.2

            sparse_labels = tf.range(num_classes) # logits에 대응하는 라벨을 만들어 줍니다. 여기서 라벨은 클래스 인덱스와 같습니다. 
            loss = self.compiled_loss(sparse_labels, similarities)

        # 이하 코드는 무시하셔도 좋습니다. gradient, optimizer, metric에 관한 부분입니다. 
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        self.compiled_metrics.update_state(sparse_labels, similarities)
        return {m.name: m.result() for m in self.metrics}

### 이제 모델을 만들고 학습을 해보겠습니다. 분류 과제와는 달리 마지막 레이어에 softmax 활성화 함수를 쓰지 않으며, embedding vector까지만 만들도록 합니다.

In [None]:
inputs = layers.Input(shape=(height_width, height_width, 3))
x = layers.Conv2D(32, 3, activation="relu")(inputs)
x = layers.Conv2D(64, 3, activation="relu")(x)
x = layers.Conv2D(128, 3, activation="relu")(x)
x = layers.GlobalAveragePooling2D()(x)
embeddings = layers.Dense(units=8, activation=None)(x)
embeddings = tf.nn.l2_normalize(embeddings, axis=-1)

model = EmbeddingModel(inputs, embeddings)

In [None]:
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=1e-3),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),  # EmbeddingModel에서 sparse_labels를 만들었으므로 SparseCategoricalCrossentropy loss를 씁니다. 
)

history = model.fit(AnchorPositivePairs(num_batchs=1000), epochs=40)

plt.plot(history.history["loss"])
plt.show()

### 학습이 잘 되고 있는 것 같습니다. 

### 분류 과제와 동일한 metric을 측정하려면 레이어를 추가하고 튜닝을 해야겠지만, 여기서는 테스트 데이터셋으로 간단하게 성능을 측정해보겠습니다. 

### 테스트 데이터셋에 대해 embedding vector를 추출 후 학습 데이터셋의 embedding vector와 유사도를 비교하여, 임의의 테스트 데이터 샘플에 대해 학습 데이터셋에서 가장 유사한 이미지가 같은 클래스에 속하는지 계산해 보겠습니다. 

In [None]:
train_embed = model.predict(x_train)
test_embed = model.predict(x_test)
sim_mat = tf.einsum("ae,be->ab", test_embed, train_embed)  # 테스트 데이터셋의 embedding vector와 학습 데이터셋의 embedding vector 사이의 유사도를 계산합니다. 
top1_test = np.argsort(sim_mat)[:,-2:]   # 학습 데이터셋에서 가장 큰 유사도를 가지는 인덱스 2개를 반환합니다. 

cnt = 0
for row_idx in range(len(y_test)):
    top1_idx = top1_test[row_idx][1]   # 1번째 인덱스가 가장 큰 유사도를 가지는 학습 데이터셋의 인덱스입니다. 
    top1_class = y_train[top1_idx]   # 해당 학습 데이터 샘플의 클래스입니다. 
    if y_test[row_idx] == top1_class:
        cnt+=1
print('가장 유사한 이미지를 맞춘 테스트 샘플 수: ',cnt)

### 결과가 썩 만족스럽지는 않지만 학습을 더 진행하면 결과가 나아질 것 같습니다. 

### 테스트 데이터셋의 처음 10개 샘플에 대해 가장 유사한 학습 데이터셋의 샘플이 어떻게 나왔는지 살펴보겠습니다. 

In [None]:
labels = [
    "Airplane",
    "Automobile",
    "Bird",
    "Cat",
    "Deer",
    "Dog",
    "Frog",
    "Horse",
    "Ship",
    "Truck",
]

rows = 10
axes=[]
fig=plt.figure(figsize=(4,14))

for n in range(rows):
    axes.append( fig.add_subplot(rows, 2, 2*n+1) )
    subplot_title=(str(labels[y_test[n]]))
    axes[-1].set_title(subplot_title)  
    plt.imshow(x_test[n])
    plt.axis('off')

    top1_idx = top1_test[n][1]
    axes.append( fig.add_subplot(rows, 2, 2*n+2) )
    subplot_title=(str(labels[y_train[top1_idx]]))
    color = "green" if labels[y_train[top1_idx]] == labels[y_test[n]] else "red"
    axes[-1].set_title(subplot_title, color=color)     
    plt.imshow(x_train[top1_idx])
    plt.axis('off')
fig.tight_layout()    
plt.show()

### 좌측 열은 테스트 데이터셋의 샘플, 우측 열은 각 샘플에 해당하는, 가장 유사도가 높은 학습 데이터셋의 샘플입니다.

### 분류 과제가 아니지만 클래스별로 embedding이 된 것을 확인할 수 있습니다. 

### 동등한 비교는 아니지만, 동일한 모델을 분류 과제에 대해 수행하고 테스트 데이터셋에 대한 분류 정확도와 비교해보는 것은 어떨까요?