In [7]:
import os, random
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras import backend
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import (
    Input, Dense, Reshape, Flatten, Add,
    Conv2D, UpSampling2D, AveragePooling2D,
    LeakyReLU
)
from tensorflow.keras.constraints import max_norm
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.optimizers import Adam

In [17]:
DATA_DIR = "/Users/priyarajni/Desktop/Pythoncode/MSAI-630-A02/celebA"   
RESULTS_DIR = "./results"
os.makedirs(RESULTS_DIR, exist_ok=True)

SEED = 42
MAX_IMAGES = 10000          
LATENT_DIM = 128             
MAX_RESOLUTION = 64          
CHANNELS = 3

In [18]:
N_BLOCKS = int(np.log2(MAX_RESOLUTION)) - 1

In [19]:
EPOCHS_NORMAL = [5] * N_BLOCKS
EPOCHS_FADEIN = [5] * N_BLOCKS
EPOCHS_FADEIN[0] = 0 

In [20]:
BATCH_SIZES = [128, 128, 64, 32, 16][:N_BLOCKS]


In [21]:
def seed_everything(seed=SEED):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

seed_everything()

In [22]:
def list_images(data_dir):
    exts = (".jpg", ".jpeg", ".png", ".webp")
    files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.lower().endswith(exts)]
    files.sort()
    return files


In [23]:
def center_crop_square(img):
    # img: [H,W,3]
    h, w = img.shape[0], img.shape[1]
    s = min(h, w)
    y0 = (h - s) // 2
    x0 = (w - s) // 2
    return img[y0:y0+s, x0:x0+s, :]

In [24]:
def load_face_dataset_as_numpy(data_dir, max_images=MAX_IMAGES, target_res=MAX_RESOLUTION):
    files = list_images(data_dir)
    if len(files) == 0:
        raise ValueError(f"No images found in: {data_dir}")
    if len(files) > max_images:
        files = random.sample(files, max_images)

    X = []
    for fp in tqdm(files, desc="Loading images"):
        raw = tf.io.read_file(fp)
        img = tf.image.decode_image(raw, channels=3, expand_animations=False)
        img = tf.cast(img, tf.float32).numpy()

        img = center_crop_square(img)
        img = tf.image.resize(img, (target_res, target_res), method="bilinear").numpy()

        # normalize to [-1, 1] like the tutorial expects
        img = (img - 127.5) / 127.5
        X.append(img)

    X = np.asarray(X, dtype=np.float32)
    return X

In [25]:
dataset = load_face_dataset_as_numpy(DATA_DIR, MAX_IMAGES, MAX_RESOLUTION)
print("Dataset shape:", dataset.shape)


Loading images: 100%|██████████| 10000/10000 [00:12<00:00, 816.10it/s]


Dataset shape: (10000, 64, 64, 3)


In [26]:
class PixelNormalization(tf.keras.layers.Layer):
    def call(self, inputs):
        values = inputs**2.0
        mean_values = backend.mean(values, axis=-1, keepdims=True)
        mean_values += 1.0e-8
        l2 = backend.sqrt(mean_values)
        return inputs / l2

class MinibatchStdev(tf.keras.layers.Layer):
    def call(self, inputs):
        mean = backend.mean(inputs, axis=0, keepdims=True)
        squ_diffs = backend.square(inputs - mean)
        mean_sq_diff = backend.mean(squ_diffs, axis=0, keepdims=True)
        mean_sq_diff += 1e-8
        stdev = backend.sqrt(mean_sq_diff)
        mean_pix = backend.mean(stdev, keepdims=True)
        shape = backend.shape(inputs)
        output = backend.tile(mean_pix, (shape[0], shape[1], shape[2], 1))
        return backend.concatenate([inputs, output], axis=-1)
    
class WeightedSum(Add):
    def __init__(self, alpha=0.0, **kwargs):
        super().__init__(**kwargs)
        self.alpha = backend.variable(alpha, name="ws_alpha")
    def _merge_function(self, inputs):
        assert len(inputs) == 2
        return ((1.0 - self.alpha) * inputs[0]) + (self.alpha * inputs[1])

def wasserstein_loss(y_true, y_pred):
    return backend.mean(y_true * y_pred)

def update_fadein(models, step, n_steps):
    alpha = step / float(n_steps - 1) if n_steps > 1 else 1.0
    for model in models:
        for layer in model.layers:
            if isinstance(layer, WeightedSum):
                backend.set_value(layer.alpha, alpha)

def scale_dataset(images, new_shape_hw_c):
    # new_shape_hw_c: (H,W,C)
    new_h, new_w, _ = new_shape_hw_c
    scaled = tf.image.resize(images, (new_h, new_w), method="bilinear").numpy()
    return scaled

In [27]:
def add_generator_block(old_model):
    init = RandomNormal(stddev=0.02)
    const = max_norm(1.0)
    block_end = old_model.layers[-2].output

    up = UpSampling2D()(block_end)  # <-- upsampling choice (you will discuss this)
    g = Conv2D(128, (3,3), padding="same", kernel_initializer=init, kernel_constraint=const)(up)
    g = PixelNormalization()(g)
    g = LeakyReLU(0.2)(g)
    g = Conv2D(128, (3,3), padding="same", kernel_initializer=init, kernel_constraint=const)(g)
    g = PixelNormalization()(g)
    g = LeakyReLU(0.2)(g)

    out_image = Conv2D(3, (1,1), padding="same", kernel_initializer=init, kernel_constraint=const)(g)
    model1 = Model(old_model.input, out_image)

    out_old = old_model.layers[-1]     # previous toRGB layer
    out_image2 = out_old(up)           # old toRGB applied to upsampled features

    merged = WeightedSum()([out_image2, out_image])
    model2 = Model(old_model.input, merged)
    return [model1, model2]
 
   

In [28]:
def define_generator(latent_dim, n_blocks, in_dim=4):
    init = RandomNormal(stddev=0.02)
    const = max_norm(1.0)
    model_list = []

    in_latent = Input(shape=(latent_dim,))
    g = Dense(128 * in_dim * in_dim, kernel_initializer=init, kernel_constraint=const)(in_latent)
    g = Reshape((in_dim, in_dim, 128))(g)

    g = Conv2D(128, (3,3), padding="same", kernel_initializer=init, kernel_constraint=const)(g)
    g = PixelNormalization()(g)
    g = LeakyReLU(0.2)(g)
    g = Conv2D(128, (3,3), padding="same", kernel_initializer=init, kernel_constraint=const)(g)
    g = PixelNormalization()(g)
    g = LeakyReLU(0.2)(g)

    out_image = Conv2D(3, (1,1), padding="same", kernel_initializer=init, kernel_constraint=const)(g)
    model = Model(in_latent, out_image)
    model_list.append([model, model])

    for _ in range(1, n_blocks):
        models = add_generator_block(model_list[-1][0])
        model_list.append(models)

    return model_list


In [29]:
def add_discriminator_block(old_model, n_input_layers=3):
    init = RandomNormal(stddev=0.02)
    const = max_norm(1.0)
    in_shape = list(old_model.input.shape)

    input_shape = (int(in_shape[-2]*2), int(in_shape[-2]*2), int(in_shape[-1]))
    in_image = Input(shape=input_shape)

    d = Conv2D(128, (1,1), padding="same", kernel_initializer=init, kernel_constraint=const)(in_image)
    d = LeakyReLU(0.2)(d)

    d = Conv2D(128, (3,3), padding="same", kernel_initializer=init, kernel_constraint=const)(d)
    d = LeakyReLU(0.2)(d)
    d = Conv2D(128, (3,3), padding="same", kernel_initializer=init, kernel_constraint=const)(d)
    d = LeakyReLU(0.2)(d)
    d = AveragePooling2D()(d)

    block_new = d

    for i in range(n_input_layers, len(old_model.layers)):
        d = old_model.layers[i](d)
    model1 = Model(in_image, d)
    model1.compile(loss=wasserstein_loss, optimizer=Adam(learning_rate=0.001, beta_1=0.0, beta_2=0.99, epsilon=1e-8))
    down = AveragePooling2D()(in_image)
    block_old = old_model.layers[1](down)
    block_old = old_model.layers[2](block_old)
    d = WeightedSum()([block_old, block_new])
    for i in range(n_input_layers, len(old_model.layers)):
        d = old_model.layers[i](d)

    model2 = Model(in_image, d)
    model2.compile(loss=wasserstein_loss, optimizer=Adam(learning_rate=0.001, beta_1=0.0, beta_2=0.99, epsilon=1e-8))

    return [model1, model2]

In [30]:
def define_discriminator(n_blocks, input_shape=(4,4,3)):
    init = RandomNormal(stddev=0.02)
    const = max_norm(1.0)
    model_list = []

    in_image = Input(shape=input_shape)
    d = Conv2D(128, (1,1), padding="same", kernel_initializer=init, kernel_constraint=const)(in_image)
    d = LeakyReLU(0.2)(d)
    d = MinibatchStdev()(d)
    d = Conv2D(128, (3,3), padding="same", kernel_initializer=init, kernel_constraint=const)(d)
    d = LeakyReLU(0.2)(d)
    d = Conv2D(128, (4,4), padding="same", kernel_initializer=init, kernel_constraint=const)(d)
    d = LeakyReLU(0.2)(d)
    d = Flatten()(d)
    out_class = Dense(1)(d)

    model = Model(in_image, out_class)
    model.compile(loss=wasserstein_loss, optimizer=Adam(learning_rate=0.001, beta_1=0.0, beta_2=0.99, epsilon=1e-8))
    model_list.append([model, model])

    for _ in range(1, n_blocks):
        models = add_discriminator_block(model_list[-1][0])
        model_list.append(models)

    return model_list

In [31]:
def define_composite(discriminators, generators):
    model_list = []
    for i in range(len(discriminators)):
        g_models, d_models = generators[i], discriminators[i]

        d_models[0].trainable = False
        model1 = Sequential([g_models[0], d_models[0]])
        model1.compile(loss=wasserstein_loss, optimizer=Adam(learning_rate=0.001, beta_1=0.0, beta_2=0.99, epsilon=1e-8))

        d_models[1].trainable = False
        model2 = Sequential([g_models[1], d_models[1]])
        model2.compile(loss=wasserstein_loss, optimizer=Adam(learning_rate=0.001, beta_1=0.0, beta_2=0.99, epsilon=1e-8))

        model_list.append([model1, model2])
    return model_list

In [32]:
def generate_real_samples(dataset, n_samples):
    ix = np.random.randint(0, dataset.shape[0], n_samples)
    X = dataset[ix]
    y = np.ones((n_samples, 1), dtype=np.float32)
    return X, y

def generate_latent_points(latent_dim, n_samples):
    x_input = np.random.randn(latent_dim * n_samples).astype(np.float32)
    return x_input.reshape(n_samples, latent_dim)

def generate_fake_samples(generator, latent_dim, n_samples):
    x_input = generate_latent_points(latent_dim, n_samples)
    X = generator.predict(x_input, verbose=0)
    y = -np.ones((n_samples, 1), dtype=np.float32)
    return X, y

def summarize_performance(status, g_model, latent_dim, n_samples=25):
    gen_shape = g_model.output_shape
    name = "%03dx%03d-%s" % (gen_shape[1], gen_shape[2], status)

    X, _ = generate_fake_samples(g_model, latent_dim, n_samples)
    X = (X - X.min()) / (X.max() - X.min() + 1e-8)
    n = int(np.sqrt(n_samples))
    plt.figure(figsize=(8,8))
    for i in range(n*n):
        plt.subplot(n, n, i+1)
        plt.axis("off")
        plt.imshow(X[i])
    out_path = os.path.join(RESULTS_DIR, f"{name}.png")
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    plt.close()

    # save generator
    g_model.save(os.path.join(RESULTS_DIR, f"{name}.keras"))
    print("Saved:", out_path)

In [33]:
def train_epochs(g_model, d_model, gan_model, dataset, n_epochs, n_batch, fadein=False):
    bat_per_epo = int(dataset.shape[0] / n_batch)
    n_steps = bat_per_epo * n_epochs
    half_batch = n_batch // 2

    for step in range(n_steps):
        if fadein:
            update_fadein([g_model, d_model, gan_model], step, n_steps)

        X_real, y_real = generate_real_samples(dataset, half_batch)
        X_fake, y_fake = generate_fake_samples(g_model, LATENT_DIM, half_batch)

        d_loss1 = d_model.train_on_batch(X_real, y_real)
        d_loss2 = d_model.train_on_batch(X_fake, y_fake)

        z_input = generate_latent_points(LATENT_DIM, n_batch)
        y_gan = np.ones((n_batch, 1), dtype=np.float32)  # generator wants D to output "real" (positive)

        g_loss = gan_model.train_on_batch(z_input, y_gan)

        if (step+1) % max(1, (bat_per_epo//2)) == 0:
            print(f">{step+1}/{n_steps}, d1={d_loss1:.3f}, d2={d_loss2:.3f}, g={g_loss:.3f}")


In [34]:
def train(g_models, d_models, gan_models, dataset, latent_dim, e_norm, e_fadein, n_batch):
    # stage 0 (4x4)
    g_normal, d_normal, gan_normal = g_models[0][0], d_models[0][0], gan_models[0][0]
    gen_shape = g_normal.output_shape
    scaled_data = scale_dataset(dataset, gen_shape[1:])
    print("Scaled Data:", scaled_data.shape)

    train_epochs(g_normal, d_normal, gan_normal, scaled_data, e_norm[0], n_batch[0], fadein=False)
    summarize_performance("tuned", g_normal, latent_dim)

    # progressive stages
    for i in range(1, len(g_models)):
        g_normal, g_fadein = g_models[i]
        d_normal, d_fadein = d_models[i]
        gan_normal, gan_fadein = gan_models[i]

        gen_shape = g_normal.output_shape
        scaled_data = scale_dataset(dataset, gen_shape[1:])
        print("Scaled Data:", scaled_data.shape)

        # fade-in training
        if e_fadein[i] > 0:
            train_epochs(g_fadein, d_fadein, gan_fadein, scaled_data, e_fadein[i], n_batch[i], fadein=True)
            summarize_performance("faded", g_fadein, latent_dim)
        train_epochs(g_normal, d_normal, gan_normal, scaled_data, e_norm[i], n_batch[i], fadein=False)
        summarize_performance("tuned", g_normal, latent_dim)


In [36]:
g_models = define_generator(LATENT_DIM, N_BLOCKS)
d_models = define_discriminator(N_BLOCKS, input_shape=(4,4,3))
gan_models = define_composite(d_models, g_models)

train(g_models, d_models, gan_models,
      dataset=dataset,
      latent_dim=LATENT_DIM,
      e_norm=EPOCHS_NORMAL,
      e_fadein=EPOCHS_FADEIN,
      n_batch=BATCH_SIZES)

print("Done. Check ./results for output images and saved .keras models.")

TypeError: AveragePooling2D.__init__() missing 1 required positional argument: 'pool_size'