In [None]:
import keras
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import os
import random
from keras.layers import Dense, Input, MaxPool2D, Conv2D, Flatten, InputLayer, Reshape, Conv2DTranspose, BatchNormalization
from keras import Sequential
from tensorflow.keras.preprocessing.image import load_img, img_to_array

In [None]:
PATH = "./dataset"
img_rows, img_cols = 28,28
X = []

for counter, img_path in enumerate(os.listdir(PATH)):
    path = os.path.join(PATH, img_path)
    image = load_img(path, target_size=(img_rows, img_cols))
    image = img_to_array(image)
    X.append(image)

In [None]:
# Ngubah X ke array dan normalisasi 
X = np.array(X, dtype = 'float32') / 255.0
X = X.reshape(-1, img_rows, img_cols, 3) # 
X_train, X_test = train_test_split(X, test_size = 0.4, random_state = 420)

In [None]:
# Membuat model dari VAE
hidden_dimension = 512 # Dimension dari input dan output layer
latent_dimension = 64 # Latent dimension HARUS lebih kecil dari hidden dimension
batch_size = 200
learning_rate = 0.00015

class VAE(tf.keras.Model):
    def __init__(self, hidden_dimension, latent_dimension):
        super(VAE, self).__init__()
    
        # Encoder Layer
        self.encoder = Sequential([
            InputLayer(input_shape = (img_rows, img_cols, 3)),
            Conv2D(32, kernel_size = (3, 3), activation = 'relu', strides = (2, 2), padding = 'same'),
            MaxPool2D((2,2), padding = 'same'),
            Conv2D(64, kernel_size = (3, 3), activation = 'relu', strides = (2, 2), padding = 'same'),
            Flatten(),
            Dense(hidden_dimension, activation = 'relu')
        ])

        # Latent Layer (2 Layer) --> Mean Layer, Loag Variance Layer
        self.mu_layer = Dense(latent_dimension)
        self.log_var_layer = Dense(latent_dimension)

        # Decoder Layer
        self.decoder = Sequential([
            Dense(hidden_dimension, activation = 'relu'),
            BatchNormalization(),
            Dense(7 * 7 * 32, activation = 'relu'),
            Reshape((7, 7, 32)), 
            Conv2DTranspose(64, kernel_size = (3, 3), activation = 'relu', strides = (2, 2), padding = 'same'),
            Conv2DTranspose(32, kernel_size = (3, 3), activation = 'relu', strides = (2, 2), padding = 'same'),
            Conv2DTranspose(3, kernel_size = (3, 3), padding = 'same') # 3 itu warna channel kita (RGB) dan ga dikasih strides lagi karena ukurannya sudah sesuai (7 * 2 * 2 = 28)
        ])

    # Encode = proses untuk memasukkan input ke layer
    def encode(self, x):
        h = self.encoder(x)
        return self.mu_layer(h), self.log_var_layer(h)
    
    # Proses Decode dan ini akan dipanggil saat testing
    def decode(self, z):
        # Activation Function karna udah mau ditunjukin hasilnya (sama kyk training biasa, proses akhir tetap ada activation function sigmoid)
        return tf.nn.sigmoid(self.decode_logits(z))

    # Decode Logits = 1 step sebelum activation function dalam artian hasilnya udah jadi tapi masih mentah
    # Dan ini akan dipanggil saat training
    def decode_logits(self, z):
        return self.decoder(z)
    
    # Trik untuk melakukan backpropagation secara efisien (dengan normal distribusi matematika)
    def reparameterize(self, mu, log_var):
        std = tf.exp(log_var * 0.5) # Untuk mencari Standar Deviasi
        eps = tf.random.normal(std.shape) # Untuk mendapatkan epsilon
        return mu + std * eps
    
    def call(self, inputs):
        mu, log_var = self.encode(inputs)
        z = self.reparameterize(mu, log_var)
        x_reconstructed_logits  = self.decode_logits(z)
        return x_reconstructed_logits, mu, log_var
    
model = VAE(hidden_dimension, latent_dimension)
model.build(input_shape = (batch_size, img_rows, img_cols, 3))
print(model.summary())

In [None]:
# Ngubah data train ke bentuk object tensor
dataset = tf.data.Dataset.from_tensor_slices(X_train)
dataset = dataset.shuffle(batch_size).batch(batch_size)


In [None]:
epochs = 100
optimizer = tf.keras.optimizers.Adam(learning_rate)

# Result dari loss dan div akan disimpan pada setiap stepsnya 
loss_history = [] # loss --> mean(reconstruction_loss) + mean(kl_div)
klDiv_history = [] # klDiv --> Kullback Leiber Divergence adalah perbedaan probabilitas dari training step sebelumnya

In [None]:
for epoch in range(epochs):
    for step, x in  enumerate(dataset):
        with tf.GradientTape() as tape: # Fungsi ini akan secara otomatis me-record perbedaan dari masing - masing kalkulasi 
            # Calculate Loss Function
            x_reconstructed_logits, mu, log_var = model(x)
            reconstruction_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels = x, logits = x_reconstructed_logits)
            reconstruction_loss = tf.reduce_sum(reconstruction_loss) / batch_size

            kl_div = -0.5 * tf.reduce_sum((1 + log_var - tf.square(mu) - tf.exp(log_var)), axis = -1)
            kl_div = tf.reduce_mean(kl_div)

            loss = tf.reduce_mean(reconstruction_loss) + kl_div

        # Model.trainable_variables --> variable yang bisa diganti di model tersebut ada apa aja
        # Backpropagation --> mengubah weight dari layer (Bias, Weight)
        gradients = tape.gradient(loss, model.trainable_variables)
        
        gradients = [tf.clip_by_norm(g, 5) for g in gradients]

        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        loss_history.append(float(reconstruction_loss))
        klDiv_history.append(float(kl_div))

        print(f'Epoch[{epoch + 1}/{epochs}], Step[{step + 1}/{len(dataset)}], Reconstuction Loss : {float(reconstruction_loss):.2f}, KL Div : {float(kl_div):.2f}')



In [None]:
# Plotting and Prediction the Test data
mu, log_var = model.encode(X_test)
# Hasil dari generated data kita
out = model.decode(mu)
out = (tf.reshape(out, [-1, img_rows, img_cols, 3]).numpy() * 255).astype(np.uint8) # Unnormalisasi karena udah mau show hasilnya --> * 255 dan convert ke int biar hasilnya integer ga float

In [None]:
num_samples = 10
random_indices = random.sample(range(len(X_test)), num_samples)
plt.figure(figsize = (20, 4))

# Original Image
for i, val in enumerate(random_indices):
    plt.subplot(2, num_samples, i + 1)
    plt.imshow(X_test[val], cmap = 'gray')
    plt.axis('off')

for i, val in enumerate(random_indices):
    plt.subplot(2, num_samples, i + 11)
    plt.imshow(out[val], cmap = 'gray')
    plt.axis('off')

plt.show()


In [None]:
plt.figure(figsize = (18,5))

plt.subplot(1, 2, 1)
plt.plot(loss_history, label = 'Loss')
plt.title('Model Loss')
plt.xlabel('Step')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(klDiv_history, label = 'Loss')
plt.title('Model KL Div')
plt.xlabel('Step')
plt.ylabel('KL Div')
plt.legend()