In [None]:
import os
from PIL import Image
import numpy as np

INPUT_DIR = "/kaggle/input/nebula-images/Nebulae/"
TARGET_SIZE = (128, 128)
OUTPUT_FILE = "processed_nebula_dataset.npy"

def get_all_jpg_images(folder_path):
    jpg_paths = []
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.lower().endswith(".jpg"):
                jpg_paths.append(os.path.join(root, file))
    return jpg_paths

def center_crop_and_resize(image_path, target_size=TARGET_SIZE):
    try:
        img = Image.open(image_path).convert("RGB")
        w, h = img.size
        min_dim = min(w, h)
        left = (w - min_dim) // 2
        top = (h - min_dim) // 2
        img = img.crop((left, top, left + min_dim, top + min_dim))
        img = img.resize(target_size, Image.LANCZOS)
        img = np.array(img).astype(np.float32) / 127.5 - 1.0
        return img
    except Exception as e:
        print(f"[Warning] Skipping {image_path}: {e}")
        return None

def process_all_images(image_paths):
    images = []
    for i, path in enumerate(image_paths):
        processed = center_crop_and_resize(path)
        if processed is not None:
            images.append(processed)
        if (i+1) % 100 == 0:
            print(f"Processed {i+1}/{len(image_paths)} images...")
    return np.array(images, dtype=np.float32)

if __name__ == "__main__":
    if os.path.exists(OUTPUT_FILE):
        print(f"{OUTPUT_FILE} already exists. Loading dataset instead of preprocessing...")
        dataset = np.load(OUTPUT_FILE)
        print(f"Loaded dataset shape: {dataset.shape}")
    else:
        print("Collecting .jpg images...")
        image_paths = get_all_jpg_images(INPUT_DIR)
        print(f"Found {len(image_paths)} images.")

        print("Preprocessing images...")
        dataset = process_all_images(image_paths)
        print(f"Final dataset shape: {dataset.shape}")

        print(f"Saving to {OUTPUT_FILE}...")
        np.save(OUTPUT_FILE, dataset)
        print("Done!")

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models
import os

In [None]:
DATA_PATH = "processed_nebula_dataset.npy"
BATCH_SIZE = 64
LATENT_DIM = 100
IMG_SHAPE = (128, 128, 3)
EPOCHS = 5000
N_CRITIC = 5
LAMBDA_GP = 10.0
SAVE_INTERVAL = 500

data = np.load(DATA_PATH).astype(np.float32)
print("Data shape:", data.shape)
dataset = tf.data.Dataset.from_tensor_slices(data).shuffle(buffer_size=1024).batch(BATCH_SIZE).prefetch(1)

In [None]:
def build_generator(latent_dim):
    model = models.Sequential([
        layers.Dense(4 * 4 * 512, input_dim=latent_dim),
        layers.Reshape((4, 4, 512)),
        layers.BatchNormalization(),
        layers.LeakyReLU(0.2),

        layers.Conv2DTranspose(256, 4, strides=2, padding='same'),
        layers.BatchNormalization(),
        layers.LeakyReLU(0.2),

        layers.Conv2DTranspose(128, 4, strides=2, padding='same'),
        layers.BatchNormalization(),
        layers.LeakyReLU(0.2),

        layers.Conv2DTranspose(64, 4, strides=2, padding='same'),
        layers.BatchNormalization(),
        layers.LeakyReLU(0.2),

        layers.Conv2DTranspose(32, 4, strides=2, padding='same'),
        layers.BatchNormalization(),
        layers.LeakyReLU(0.2),

        layers.Conv2DTranspose(3, 4, strides=2, padding='same', activation='tanh')
    ])
    return model

In [None]:
def build_discriminator(img_shape):
    model = models.Sequential([
        layers.Input(shape=img_shape),

        layers.Conv2D(64, 4, strides=2, padding='same'),
        layers.LeakyReLU(0.2),

        layers.Conv2D(128, 4, strides=2, padding='same'),
        layers.LeakyReLU(0.2),

        layers.Conv2D(256, 4, strides=2, padding='same'),
        layers.LeakyReLU(0.2),

        layers.Conv2D(512, 4, strides=2, padding='same'),
        layers.LeakyReLU(0.2),

        layers.Conv2D(512, 4, strides=2, padding='same'),
        layers.LeakyReLU(0.2),

        layers.Flatten(),
        layers.Dense(1),
    ])
    return model

In [None]:
generator = build_generator(LATENT_DIM)
discriminator = build_discriminator(IMG_SHAPE)

generator_optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4, beta_1=0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4, beta_1=0.5)

In [None]:
def wasserstein_loss(y_true, y_pred):
    return tf.reduce_mean(y_true * y_pred)

def gradient_penalty(discriminator, real_images, fake_images):
    batch_size = tf.shape(real_images)[0]
    epsilon = tf.random.uniform([batch_size, 1, 1, 1], 0.0, 1.0)
    interpolated = epsilon * real_images + (1 - epsilon) * fake_images
    with tf.GradientTape() as tape:
        tape.watch(interpolated)
        pred = discriminator(interpolated)
    grads = tape.gradient(pred, interpolated)
    grads_norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]) + 1e-12)
    gp = tf.reduce_mean((grads_norm - 1.0) ** 2)
    return gp

In [None]:
@tf.function
def train_discriminator(real_images):
    noise = tf.random.normal([BATCH_SIZE, LATENT_DIM])
    with tf.GradientTape() as tape:
        fake_images = generator(noise, training=True)
        real_output = discriminator(real_images, training=True)
        fake_output = discriminator(fake_images, training=True)

        gp = gradient_penalty(discriminator, real_images, fake_images)
        d_loss = tf.reduce_mean(fake_output) - tf.reduce_mean(real_output) + LAMBDA_GP * gp

    gradients = tape.gradient(d_loss, discriminator.trainable_variables)
    discriminator_optimizer.apply_gradients(zip(gradients, discriminator.trainable_variables))
    return d_loss, tf.reduce_mean(real_output), tf.reduce_mean(fake_output)

In [None]:
@tf.function
def train_generator():
    noise = tf.random.normal([BATCH_SIZE, LATENT_DIM])
    with tf.GradientTape() as tape:
        fake_images = generator(noise, training=True)
        fake_output = discriminator(fake_images, training=True)
        g_loss = -tf.reduce_mean(fake_output)
    gradients = tape.gradient(g_loss, generator.trainable_variables)
    generator_optimizer.apply_gradients(zip(gradients, generator.trainable_variables))
    return g_loss

In [None]:
def save_generated_images(epoch, n=5):
    noise = tf.random.normal([n*n, LATENT_DIM])
    generated_images = generator(noise, training=False)
    generated_images = (generated_images + 1.0) / 2.0

    dpi = 100
    img_size = IMG_SHAPE[0]  # 128
    figsize = (n * img_size / dpi, n * img_size / dpi)

    fig, axs = plt.subplots(n, n, figsize=figsize, dpi=dpi)
    plt.subplots_adjust(wspace=0, hspace=0)  # no gaps

    for i in range(n):
        for j in range(n):
            axs[i, j].imshow(generated_images[i*n + j])
            axs[i, j].axis('off')

    os.makedirs("generated", exist_ok=True)
    plt.savefig(f"generated/nebula_wgan_gp_epoch_{epoch}.png", dpi=dpi, bbox_inches='tight', pad_inches=0)
    plt.close()

In [None]:
def train(dataset, epochs):
    d_losses = []
    g_losses = []
    real_scores = []
    fake_scores = []

    for epoch in range(epochs):
        dataset_iter = iter(dataset)
    
        for _ in range(N_CRITIC):
            real_batch = next(dataset_iter)
            d_loss, real_score, fake_score = train_discriminator(real_batch)

        g_loss = train_generator()

        d_losses.append(float(d_loss))
        g_losses.append(float(g_loss))
        real_scores.append(float(real_score))
        fake_scores.append(float(fake_score))

        if epoch % 100 == 0:
            print(f"Epoch {epoch}, D loss: {d_loss:.4f}, real_score: {real_score:.4f}, fake_score: {fake_score:.4f}, G loss: {g_loss:.4f}")

        if epoch % SAVE_INTERVAL == 0:
            save_generated_images(epoch)

    return d_losses, real_scores, fake_scores, g_losses

In [None]:
def plot_training_history(d_losses, g_losses):
    epochs = range(len(d_losses))

    plt.figure(figsize=(14, 5))

    # Plot losses
    plt.subplot(1, 2, 1)
    plt.plot(epochs, d_losses, label='Discriminator Loss')
    plt.plot(epochs, g_losses, label='Generator Loss')
    plt.title('Loss Over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [None]:
d_losses, real_scores, fake_scores, g_losses = train(dataset, EPOCHS)

In [None]:
plot_training_history(d_losses, g_losses)

In [None]:
def generate_and_show_images(generator, latent_dim, n=5):
    noise = np.random.normal(0, 1, (n * n, latent_dim))
    
    generated_images = generator.predict(noise)
    
    generated_images = 0.5 * generated_images + 0.5
    
    fig, axs = plt.subplots(n, n, figsize=(n, n))
    count = 0
    for i in range(n):
        for j in range(n):
            axs[i, j].imshow(generated_images[count])
            axs[i, j].axis('off')
            count += 1
    plt.show()

In [None]:
generate_and_show_images(generator, latent_dim=100, n=5)

In [None]:
generator.summary()
generator.save("generator.keras")

In [None]:
discriminator.summary()
discriminator.save("discriminator.keras")