In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, Model
import os

# --- 1. KONFIGURASI & LOAD DATASET ---
# Sesuaikan path ini dengan lokasi folder dataset Anime Anda
IMAGE_PATH = "./data/" 

# Parameter sesuai Soal Anime
IMAGE_SIZE = 64
BATCH_SIZE = 64
EPOCHS = 60       # Minimum 60 epochs sesuai soal
LATENT_DIM = 32   # Dimensi latent space
LEARNING_RATE = 0.0001 # Learning rate standard untuk CVAE

def load_data(path):
    images = []
    if not os.path.exists(path):
        print(f"Error: Path {path} tidak ditemukan.")
        return np.array([])
        
    # Mengambil list file gambar
    files = [f for f in os.listdir(path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
    
    for filename in files:
        try:
            # Load gambar dan resize ke 64x64
            img = tf.keras.preprocessing.image.load_img(
                os.path.join(path, filename), 
                target_size=(IMAGE_SIZE, IMAGE_SIZE)
            )
            img = tf.keras.preprocessing.image.img_to_array(img)
            images.append(img)
        except Exception as e:
            continue
            
    return np.array(images)

print("Loading Anime dataset...")
images = load_data(IMAGE_PATH)
print(f"Jumlah gambar ditemukan: {len(images)}")

# --- 2. DATA PREPROCESSING & SPLIT ---
if len(images) > 0:
    # Shuffle dataset (Sesuai soal: Randomly shuffle)
    np.random.shuffle(images)

    # Normalisasi (Membagi dengan 255 -> range 0-1)
    images = images.astype('float32') / 255.0
    
    # NOTE: Tidak di-flatten (reshape ke 1D) karena kita pakai CNN (Conv2D)
    # Shape tetap: (Jumlah Data, 64, 64, 3)

    # Split Dataset: 80% Train, 10% Validation, 10% Test (Sesuai Soal)
    total_data = len(images)
    train_count = int(0.8 * total_data)
    val_count = int(0.1 * total_data)
    # Sisanya untuk test
    
    x_train = images[:train_count]
    x_val = images[train_count : train_count + val_count]
    x_test = images[train_count + val_count :]
    
    print(f"Training Data   : {x_train.shape}")
    print(f"Validation Data : {x_val.shape}")
    print(f"Testing Data    : {x_test.shape}")

    # Buat tf.data.Dataset untuk batching
    train_dataset = tf.data.Dataset.from_tensor_slices(x_train)\
                    .shuffle(BATCH_SIZE * 5).batch(BATCH_SIZE)
    val_dataset = tf.data.Dataset.from_tensor_slices(x_val).batch(BATCH_SIZE)
    test_dataset = tf.data.Dataset.from_tensor_slices(x_test).batch(BATCH_SIZE)

# --- 3. ARSITEKTUR VAE (CONVOLUTIONAL / CVAE) ---
# Menggunakan Conv2D karena lebih cocok untuk detail wajah Anime dibanding Dense
class CVAE(Model):
    def __init__(self, latent_dim):
        super(CVAE, self).__init__()
        self.latent_dim = latent_dim
        
        # --- ENCODER (CNN) ---
        # Input: (64, 64, 3)
        self.encoder = tf.keras.Sequential([
            layers.InputLayer(input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3)),
            
            layers.Conv2D(32, 3, strides=2, padding='same', activation='relu', name="enc_conv1"),
            layers.BatchNormalization(),
            
            layers.Conv2D(64, 3, strides=2, padding='same', activation='relu', name="enc_conv2"),
            layers.BatchNormalization(),
            
            layers.Conv2D(128, 3, strides=2, padding='same', activation='relu', name="enc_conv3"),
            layers.BatchNormalization(),
            
            layers.Flatten(),
        ], name="encoder_net")
        
        # Latent Space (Mean & Log Variance)
        self.z_mean = layers.Dense(latent_dim, name="z_mean")
        self.z_log_var = layers.Dense(latent_dim, name="z_log_var")

        # --- DECODER (CNN Transpose) ---
        # Kita mulai dari latent -> Dense -> Reshape ke bentuk 3D kecil
        self.dec_dense_input = layers.Dense(8 * 8 * 128, activation='relu', name="dec_dense")
        self.dec_reshape = layers.Reshape((8, 8, 128), name="dec_reshape")
        
        self.decoder_net = tf.keras.Sequential([
            layers.Conv2DTranspose(128, 3, strides=2, padding='same', activation='relu', name="dec_convT1"),
            layers.BatchNormalization(),
            
            layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='relu', name="dec_convT2"),
            layers.BatchNormalization(),
            
            layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation='relu', name="dec_convT3"),
            layers.BatchNormalization(),
            
            # Output Layer: Sigmoid agar output di range [0, 1]
            layers.Conv2DTranspose(3, 3, strides=1, padding='same', activation='sigmoid', name="output_layer") 
        ], name="decoder_net")

    def encode(self, x):
        h = self.encoder(x)
        mean = self.z_mean(h)
        log_var = self.z_log_var(h)
        return mean, log_var

    def reparameterize(self, mean, log_var):
        batch = tf.shape(mean)[0]
        dim = tf.shape(mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return mean + tf.exp(0.5 * log_var) * epsilon

    def decode(self, z):
        h = self.dec_dense_input(z)
        h = self.dec_reshape(h)
        logits = self.decoder_net(h)
        return logits

    def call(self, inputs):
        mean, log_var = self.encode(inputs)
        z = self.reparameterize(mean, log_var)
        reconstruction = self.decode(z)
        return reconstruction, mean, log_var

# --- 4. TRAINING SETUP ---
vae = CVAE(latent_dim=LATENT_DIM)
optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)

# Fungsi Loss
def vae_loss_func(y_true, y_pred, mean, log_var):
    # 1. Reconstruction Loss
    # Karena output sigmoid (pixel 0-1), kita gunakan Binary Crossentropy
    recon_loss = tf.reduce_mean(
        tf.reduce_sum(
            tf.keras.losses.binary_crossentropy(y_true, y_pred), axis=(1, 2)
        )
    )
    
    # 2. KL Divergence
    kl_loss = -0.5 * (1 + log_var - tf.square(mean) - tf.exp(log_var))
    kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
    
    return recon_loss + kl_loss, recon_loss, kl_loss

# Step Training
@tf.function
def train_step(images):
    with tf.GradientTape() as tape:
        reconstruction, mean, log_var = vae(images)
        total_loss, recon_loss, kl_loss = vae_loss_func(images, reconstruction, mean, log_var)
    
    grads = tape.gradient(total_loss, vae.trainable_variables)
    optimizer.apply_gradients(zip(grads, vae.trainable_variables))
    return total_loss, recon_loss, kl_loss

# Step Validation
@tf.function
def val_step(images):
    reconstruction, mean, log_var = vae(images)
    total_loss, recon_loss, kl_loss = vae_loss_func(images, reconstruction, mean, log_var)
    return total_loss, recon_loss, kl_loss

# --- 5. TRAINING LOOP ---
if len(images) > 0:
    train_loss_hist = []
    val_loss_hist = []
    
    print(f"\n--- Starting Training for {EPOCHS} Epochs ---")
    for epoch in range(EPOCHS):
        # -- Training Phase --
        total_loss_t = 0
        steps_t = 0
        for batch_imgs in train_dataset:
            t_loss, _, _ = train_step(batch_imgs)
            total_loss_t += t_loss
            steps_t += 1
        avg_train_loss = total_loss_t / steps_t
        train_loss_hist.append(avg_train_loss)
        
        # -- Validation Phase --
        total_loss_v = 0
        steps_v = 0
        for batch_imgs in val_dataset:
            v_loss, _, _ = val_step(batch_imgs)
            total_loss_v += v_loss
            steps_v += 1
        avg_val_loss = total_loss_v / steps_v if steps_v > 0 else 0
        val_loss_hist.append(avg_val_loss)
        
        print(f"Epoch {epoch+1}/{EPOCHS} | Train Loss: {avg_train_loss:.2f} | Val Loss: {avg_val_loss:.2f}")

    # Plot Loss History
    plt.figure(figsize=(10, 5))
    plt.plot(train_loss_hist, label='Training Loss')
    plt.plot(val_loss_hist, label='Validation Loss')
    plt.title("Training vs Validation Loss History")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

    # --- 6. PREDICTION & EVALUATION ---
    # Ambil 10 gambar dari test set untuk visualisasi
    test_batch = next(iter(test_dataset))
    test_samples = test_batch[:10]
    
    # Prediksi
    recon_images, _, _ = vae(test_samples)
    recon_images = recon_images.numpy()
    
    print("\nVisualisasi Hasil (Original vs Reconstructed):")
    plt.figure(figsize=(20, 4))
    for i in range(10):
        # Original
        ax = plt.subplot(2, 10, i + 1)
        plt.imshow(test_samples[i])
        plt.title("Original")
        plt.axis("off")
        
        # Reconstructed
        ax = plt.subplot(2, 10, i + 1 + 10)
        plt.imshow(recon_images[i])
        plt.title("Recon")
        plt.axis("off")
    plt.show()

else:
    print("Dataset kosong. Pastikan path folder benar.")