<a href="https://colab.research.google.com/github/XDMickeyYau/CSC2515-Project/blob/main/simCLR_custom.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Data Loading

In [None]:
import re
import numpy as np

import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
import tensorflow_hub as hub
import tensorflow_datasets as tfds

import matplotlib
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!cp ./drive/MyDrive/cifar-10-python/cifar-100-python.tar.gz  ./

In [None]:
!tar -xf  cifar-100-python.tar.gz

In [None]:
import pickle
import numpy as np
import matplotlib.pyplot as plt
import os

def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='latin1')
    return dict

def load_cifar_data(file_dir):
    train_data_dict = unpickle(os.path.join(file_dir, "train"))
    test_data_dict = unpickle(os.path.join(file_dir, "test"))
    x_train = train_data_dict["data"].reshape(50000, 3, 32, 32).transpose(0,2,3,1).astype("uint8")
    x_test = test_data_dict["data"].reshape(10000, 3, 32, 32).transpose(0,2,3,1).astype("uint8")
    return x_train, x_test

# flattern cifar data from N*32*32*3 to N*3072
def flattern_data(x):
    samples = x.shape[0]
    flattern_shape = 1
    for dim in x.shape[1:]:
        flattern_shape *= dim
    return x.reshape(samples, flattern_shape)

# reconstruct data from N*3072 to N*32*32*3
def construct_image_from_flattern(x, colored = True):
    samples = x.shape[0]
    if colored:
      return x.reshape(samples, 32, 32, 3).astype("uint8")
    else:
      return x.reshape(samples, 32, 32).astype("uint8")

#visualize image data, displayed on row*col grid, x's 1st-dim >= (row*col)
def visualize_data(row, col, plt_size, x):
    fig, axes1 = plt.subplots(row, col, figsize=(plt_size, plt_size))
    i = 0
    for j in range(row):
        for k in range(col):
            if i >= len(x):
              break
            axes1[j][k].set_axis_off()
            axes1[j][k].imshow(x[i])
            i += 1
    plt.show()
    return

In [None]:
x_train_2d_3channel, x_test_2d_3channel = load_cifar_data("cifar-100-python")
x_train_2d_3channel_float = (x_train_2d_3channel/255).astype(np.float32)
x_test_2d_3channel_float = (x_test_2d_3channel/255).astype(np.float32)

In [None]:
x_train_2d_3channel_float.shape, x_test_2d_3channel_float.shape

((50000, 32, 32, 3), (10000, 32, 32, 3))

Model

In [None]:
import re
import numpy as np

import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
import tensorflow_hub as hub
import tensorflow_datasets as tfds

import matplotlib
import matplotlib.pyplot as plt

In [None]:
x_train_2d_3channel_tensor = tf.convert_to_tensor(x_train_2d_3channel_float)
x_test_2d_3channel_tensor = tf.convert_to_tensor(x_test_2d_3channel_float)


In [None]:
hub_path = 'gs://simclr-checkpoints/simclrv2/finetuned_100pct/r50_1x_sk0/hub/'
module = hub.Module(hub_path, trainable=False)

In [None]:
x_train_feature = module(x_train_2d_3channel_tensor)
x_test_feature = module(x_test_2d_3channel_tensor)
x_train_feature.shape, x_test_feature.shape

(TensorShape([50000, 2048]), TensorShape([10000, 2048]))

In [None]:
sess = tf.Session()
sess.run(tf.global_variables_initializer())

In [None]:
x_train_feature_float = sess.run(x_train_feature)
x_test_feature_float = sess.run(x_test_feature)
x_train_feature_float.shape, x_test_feature_float.shape

((50000, 2048), (10000, 2048))

PCA & K Mean

In [None]:
from sklearn.decomposition import PCA

# construct a pca object from given data
def get_pca(component, X):
    pca = PCA(n_components=component)
    pca.fit(X)
    return pca

def pca_encode(pca_model, X):
  return pca_model.transform(X)

def pca_decode(pca_model, X):
  return pca_model.inverse_transform(X)

In [None]:
from sklearn.cluster import KMeans

# generate a kmeans model and label of input data
def generate_kmeans_model(X, k):
    kmeans = KMeans(n_clusters=k, random_state=0)
    transformed = kmeans.fit_predict(X)
    return kmeans, transformed

# predict cluster labels of given data
def kmeans_clustering(model, X):
    return model.predict(X)

# transform a list of labels respect to index into a dictionary of {cluster)num:[index]}
def clusters_to_index(cluster_labels):
    dict = {}
    for i in range(len(cluster_labels)):
        if cluster_labels[i] in dict:
            dict[cluster_labels[i]].append(i)
        else:
            dict[cluster_labels[i]] = [i]
    return dict

In [None]:
def pca_kmeans_pipeline(Train, Test, feature_vector_size, cluster_num):
  pca_model = get_pca(feature_vector_size, Train)
  encoded_image = pca_encode(pca_model, Test)
  kmeans_model, data_index_cluster_labels = generate_kmeans_model(encoded_image, cluster_num)
  cluster_index_dict = clusters_to_index(data_index_cluster_labels)
  return pca_model, kmeans_model, cluster_index_dict

In [None]:
# visualizing kmeans cluster centers
def visualize_kmeans_centre(kmeans_model, pca_model, row, col, colored = True):
  cluster_centers = kmeans_model.cluster_centers_
  cluster_centers_decoded = pca_decode(pca_model, cluster_centers)
  cluster_centers_decoded_image = construct_image_from_flattern(cluster_centers_decoded, colored)
  visualize_data(row, col, 15, cluster_centers_decoded_image)


In [None]:
feature_vector_size = 500
n_cluster = 100
pca_model, kmeans_model, cluster_index_dict = pca_kmeans_pipeline(x_train_feature_float, x_test_feature_float, feature_vector_size, n_cluster)

In [None]:
# Vidualize image clusters
for cluster in sorted(list(cluster_index_dict.keys())):
  print(cluster)
  image_index = cluster_index_dict[cluster]
  images = x_test_2d_3channel[image_index]
  visualize_data(3,3, 5, images)