In [176]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("jangedoo/utkface-new")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/utkface-new


In [177]:
import os
import glob
import random
from dataclasses import dataclass
from typing import Tuple


import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [178]:
@dataclass
class Config:
    utkface_candidates = [
    "/kaggle/input/utkface-new/UTKFace",
    "/kaggle/input/utkface/UTKFace",
    "./UTKFace",
    ]
    img_size: int = 64
    batch_size: int = 2
    latent_dim: int = 256
    age_embed_dim: int = 64
    g_lr: float = 2e-4
    d_lr: float = 2e-4
    beta1: float = 0.5
    beta2: float = 0.999
    adv_weight: float = 1.0
    age_weight: float = 10.0
    rec_weight: float = 10.0
    epochs: int = 50
    steps_per_epoch: int = 500
    mixed_precision: bool = False
    seed: int = 42
    checkpoint_dir: str = "checkpoints_utk"

CFG = Config()
AUTOTUNE = tf.data.AUTOTUNE
random.seed(CFG.seed)

In [179]:
def find_utkface_dir():
	for path in CFG.utkface_candidates:
		if os.path.isdir(path):
			jpgs = glob.glob(os.path.join(path, "*.jpg"))
			if len(jpgs) > 50:
				print(f"Found UTKFace at: {path} (num files: {len(jpgs)})")
				return path
	raise FileNotFoundError("UTKFace dataset not found. Place the dataset in one of the accepted paths: " + ", ".join(CFG.utkface_candidates))

In [180]:
def parse_age_from_path(path: str) -> float:
# UTKFace filename format: age_gender_race_date.jpg
	fname = os.path.basename(path)
	try:
		age_str = fname.split("_")[0]
		age = float(age_str)
	except Exception:
		age = 30.0
	return max(0.0, min(age, 116.0))

In [181]:
def preprocess_image_bytes(path: str, img_size: int = CFG.img_size) -> Tuple[tf.Tensor, tf.Tensor]:
    img = tf.io.read_file(path)
    img = tf.io.decode_jpeg(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    img = tf.image.resize(img, (img_size, img_size), antialias=True)
    # normalize in [0,1]
    age = parse_age_from_path(path)
    age_norm = age / 116.0
    return img, age_norm

In [182]:
def tf_preprocess(path):
    path = tf.cast(path, tf.string)
    img_bytes = tf.io.read_file(path)
    img = tf.io.decode_jpeg(img_bytes, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    img = tf.image.resize(img, (CFG.img_size, CFG.img_size), antialias=True)
    
    
    # extract filename
    s = tf.strings.split(path, os.sep)[-1]
    age_str = tf.strings.split(s, "_")[0]
    age = tf.strings.to_number(age_str, out_type=tf.float32)
    age = tf.clip_by_value(age, 0.0, 116.0) / 116.0
    return img, age

In [183]:
def build_dataset_from_dir(dir_path: str, batch_size: int, training: bool = True) -> tf.data.Dataset:
	pattern = os.path.join(dir_path, "*.jpg")
	files = tf.data.Dataset.list_files(pattern, shuffle=training, seed=CFG.seed)
	ds = files.map(tf_preprocess, num_parallel_calls=AUTOTUNE)
	if training:
		ds = ds.map(lambda x, a: (tf.image.random_flip_left_right(x), a), num_parallel_calls=AUTOTUNE)
		ds = ds.shuffle(2048, seed=CFG.seed, reshuffle_each_iteration=True)
	ds = ds.batch(batch_size, drop_remainder=True).prefetch(AUTOTUNE)
	return ds

In [184]:
def conv_block(x, filters, k=3, s=1, use_bn=True):
    x = layers.Conv2D(filters, k, strides=s, padding="same")(x)
    x = layers.LeakyReLU(0.2)(x)
    if use_bn:
        x = layers.BatchNormalization()(x)
    return x

In [185]:
def res_block(x, filters):
    shortcut = x
    if x.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, 1, padding='same')(shortcut)  # match channels
    x = layers.Conv2D(filters, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(filters, 3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Add()([shortcut, x])
    x = layers.ReLU()(x)
    return x


In [186]:
def build_encoder(img_size=CFG.img_size, latent_dim=CFG.latent_dim):
    inp = layers.Input((img_size, img_size, 3))
    x = conv_block(inp, 8, s=2, use_bn=False)   # 8 filters
    x = conv_block(x, 16, s=2)                  # 16 filters
    x = conv_block(x, 32, s=2)                  # 32 filters
    x = layers.GlobalAveragePooling2D()(x)
    z = layers.Dense(latent_dim)(x)
    return keras.Model(inp, z, name="Encoder")

In [187]:
def build_age_embed(age_embed_dim=CFG.age_embed_dim):
    age_in = layers.Input((1,))
    x = layers.Dense(64, activation="relu")(age_in)
    x = layers.Dense(age_embed_dim, activation="relu")(x)
    return keras.Model(age_in, x, name="AgeEmbed")

In [188]:
def build_generator(latent_dim=CFG.latent_dim, age_embed_dim=CFG.age_embed_dim, img_size=CFG.img_size):
    z_in = layers.Input(shape=(latent_dim,))
    a_in = layers.Input(shape=(age_embed_dim,))
    
    # Concatenate latent vector and age embedding
    x = layers.Concatenate()([z_in, a_in])
    
    # Map to small 4x4x64 tensor
    x = layers.Dense(4*4*64, activation='relu')(x)
    x = layers.Reshape((4, 4, 64))(x)
    
    # Upsample progressively
    x = layers.Conv2DTranspose(64, 4, strides=2, padding='same')(x)  # 8x8
    x = layers.LeakyReLU(0.2)(x)
    
    x = layers.Conv2DTranspose(32, 4, strides=2, padding='same')(x)  # 16x16
    x = layers.LeakyReLU(0.2)(x)
    
    x = layers.Conv2DTranspose(16, 4, strides=2, padding='same')(x)  # 32x32
    x = layers.LeakyReLU(0.2)(x)
    
    # Upsample further if target img_size > 32
    scale = img_size // 32
    if scale > 1:
        upsample_steps = int(tf.math.log(float(scale)) / tf.math.log(2.0))
        for _ in range(upsample_steps):
            x = layers.Conv2DTranspose(16, 4, strides=2, padding='same')(x)
            x = layers.LeakyReLU(0.2)(x)
    
    # Final RGB output
    out = layers.Conv2D(3, 3, padding='same', activation='tanh')(x)
    
    return keras.Model([z_in, a_in], out, name="Generator")


In [189]:
def build_discriminator(img_size=CFG.img_size):
    inp = layers.Input((img_size, img_size, 3))
    x = conv_block(inp, 64, use_bn=False)
    x = conv_block(x, 128)
    x = conv_block(x, 256)
    x = conv_block(x, 512)
    x = conv_block(x, 512)
    x = layers.GlobalAveragePooling2D()(x)
    rf = layers.Dense(1)(x) # real/fake
    age = layers.Dense(1, activation="sigmoid")(x) # age prediction normalized
    return keras.Model(inp, [rf, age], name="Discriminator")

In [190]:
with tf.device('/CPU:0'):
    E = build_encoder(img_size=CFG.img_size)
A = build_age_embed(age_embed_dim=CFG.age_embed_dim)
with tf.device('/CPU:0'):
    G = build_generator(img_size=CFG.img_size)
    D = build_discriminator(img_size=CFG.img_size)

In [191]:
if CFG.mixed_precision:
    tf.keras.mixed_precision.set_global_policy("mixed_float16")

In [192]:
BCE = tf.keras.losses.BinaryCrossentropy(from_logits=True)
MSE = tf.keras.losses.MeanSquaredError()
L1 = tf.keras.losses.MeanAbsoluteError()
g_opt = keras.optimizers.Adam(CFG.g_lr, CFG.beta1, CFG.beta2)
d_opt = keras.optimizers.Adam(CFG.d_lr, CFG.beta1, CFG.beta2)

In [193]:
#@tf.function  # uncomment when stable
def train_step(real_img, real_age):
    # sample target ages uniformly in [0,1]
    target_age = tf.random.uniform(tf.shape(real_age), 0.0, 1.0)

    with tf.GradientTape(persistent=True) as tape:
        # latent encoding
        z = E(real_img, training=True)
        a_src = A(tf.expand_dims(real_age, -1), training=True)
        a_tgt = A(tf.expand_dims(target_age, -1), training=True)

        # generate fake image
        fake_img = G([z, a_tgt], training=True)

        # discriminator outputs
        rf_real, age_real_pred = D(real_img, training=True)
        rf_fake, age_fake_pred = D(fake_img, training=True)

        # discriminator losses
        d_adv_real = BCE(tf.ones_like(rf_real), rf_real)
        d_adv_fake = BCE(tf.zeros_like(rf_fake), rf_fake)
        d_adv = d_adv_real + d_adv_fake
        d_age = MSE(real_age, tf.squeeze(age_real_pred, -1))
        d_loss = CFG.adv_weight * d_adv + CFG.age_weight * d_age

        # generator losses
        g_adv = BCE(tf.ones_like(rf_fake), rf_fake)
        g_age = MSE(target_age, tf.squeeze(age_fake_pred, -1))

        # reconstruction
        z_fake = E(fake_img, training=True)
        x_rec = G([z_fake, a_src], training=True)
        rec = L1(real_img, x_rec)

        # perceptual loss: resize to 256x256 for VGG if needed
        try:
            real_resized = tf.image.resize(real_img, [256, 256])
            fake_resized = tf.image.resize(fake_img, [256, 256])
            perc = perceptual_loss(real_resized, fake_resized)
        except:
            perc = 0.0  # fallback for memory or shape issues

        # total generator loss
        g_loss = CFG.adv_weight * g_adv + CFG.age_weight * g_age + CFG.rec_weight * rec + 0.1 * perc

    # compute gradients
    d_grads = tape.gradient(d_loss, D.trainable_variables)
    ge_grads = tape.gradient(g_loss, E.trainable_variables + A.trainable_variables + G.trainable_variables)

    # apply gradients
    d_opt.apply_gradients(zip(d_grads, D.trainable_variables))
    g_opt.apply_gradients(zip(ge_grads, E.trainable_variables + A.trainable_variables + G.trainable_variables))

    return {
        'd_loss': d_loss,
        'g_loss': g_loss,
        'd_adv': d_adv,
        'g_adv': g_adv,
        'd_age': d_age,
        'g_age': g_age,
        'rec': rec,
        'perc': perc
    }


In [194]:
def train_on_utkface(dataset_dir: str):
    import os
    import tensorflow as tf
    from tensorflow import keras

    # Create checkpoint directory if not exists
    os.makedirs(CFG.checkpoint_dir, exist_ok=True)

    # Build dataset
    train_ds = build_dataset_from_dir(dataset_dir, CFG.batch_size, training=True)

    # Metrics
    d_metric = keras.metrics.Mean()
    g_metric = keras.metrics.Mean()

    step = 0
    for epoch in range(1, CFG.epochs + 1):
        print(f"Starting epoch {epoch}/{CFG.epochs}")
        d_metric.reset_state()
        g_metric.reset_state()

        for batch, (imgs, ages) in enumerate(train_ds.take(CFG.steps_per_epoch)):
            # Train step (decorated with @tf.function)
            logs = train_step(imgs, ages)

            # Update metrics
            d_metric.update_state(logs['d_loss'])
            g_metric.update_state(logs['g_loss'])

            step += 1

            if step % 50 == 0:
                tf.print(
                    "Epoch", epoch, "Step", step,
                    "| D_loss:", d_metric.result(),
                    "G_loss:", g_metric.result(),
                    "Adv(D/G):", logs['d_adv'], "/", logs['g_adv'],
                    "Age(D/G):", logs['d_age'], "/", logs['g_age'],
                    "Rec:", logs['rec']
                )

        # Save checkpoints at the end of each epoch
        E.save(os.path.join(CFG.checkpoint_dir, f"E_epoch{epoch}.keras"))
        A.save(os.path.join(CFG.checkpoint_dir, f"A_epoch{epoch}.keras"))
        G.save(os.path.join(CFG.checkpoint_dir, f"G_epoch{epoch}.keras"))
        D.save(os.path.join(CFG.checkpoint_dir, f"D_epoch{epoch}.keras"))

    print("Training finished.")


In [195]:
def load_models(checkpoint_dir: str = CFG.checkpoint_dir):
    e = keras.models.load_model(os.path.join(checkpoint_dir, os.listdir(checkpoint_dir)[0]), compile=False)
    a = A
    g = G
    return e, a, g

In [196]:
def infer_image(image_path: str, target_age: float, e_model=None, a_model=None, g_model=None, out_path: str = None):
    img = tf.io.read_file(image_path)
    img = tf.io.decode_jpeg(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    img = tf.image.resize(img, (CFG.img_size, CFG.img_size), antialias=True)
    age_norm = min(max(target_age, 0.0), 116.0) / 116.0
    
    
    e_m = E if e_model is None else e_model
    a_m = A if a_model is None else a_model
    g_m = G if g_model is None else g_model
    
    
    z = e_m(tf.expand_dims(img, 0), training=False)
    a_emb = a_m(tf.expand_dims([age_norm], 0), training=False)
    out = g_m([z, a_emb], training=False)
    out = tf.squeeze(out, 0)
    out = tf.clip_by_value(out, 0.0, 1.0)
    if out_path:
        out_uint8 = tf.image.convert_image_dtype(out, tf.uint8)
        tf.io.write_file(out_path, tf.image.encode_jpeg(out_uint8))
    return out

In [197]:
if __name__ == "__main__":
    try:
        ds_dir = find_utkface_dir()
    except FileNotFoundError as e:
        print(str(e))
        print("Exiting. Please place UTKFace images in one of the accepted locations and re-run.")
        raise SystemExit(1)
    train_on_utkface(ds_dir)

Found UTKFace at: /kaggle/input/utkface-new/UTKFace (num files: 23708)
Starting epoch 1/50
Epoch 1 Step 50 | D_loss: 2.5166285 G_loss: 6.50350571 Adv(D/G): 1.39040411 / 0.699852 Age(D/G): 0.0135594923 / 0.222628325 Rec: 0.498584211
Epoch 1 Step 100 | D_loss: 2.24503732 G_loss: 6.53363323 Adv(D/G): 1.38642406 / 0.70430249 Age(D/G): 0.000229133962 / 0.0324250273 Rec: 0.438810766
Epoch 1 Step 150 | D_loss: 2.08430552 G_loss: 6.49170828 Adv(D/G): 1.38667893 / 0.702470303 Age(D/G): 0.0476740636 / 0.225677699 Rec: 0.502970219
Epoch 1 Step 200 | D_loss: 1.98804522 G_loss: 6.52308178 Adv(D/G): 1.38667274 / 0.700760126 Age(D/G): 0.00435275026 / 0.0428729691 Rec: 0.415974677
Epoch 1 Step 250 | D_loss: 1.9474771 G_loss: 6.49535322 Adv(D/G): 1.38632786 / 0.696652889 Age(D/G): 0.0422815904 / 0.126382962 Rec: 0.428296804
Epoch 1 Step 300 | D_loss: 1.90251625 G_loss: 6.53707266 Adv(D/G): 1.38635707 / 0.697772384 Age(D/G): 0.00251175743 / 0.102091096 Rec: 0.381053925
Epoch 1 Step 350 | D_loss: 1.86967

In [198]:
import tensorflow as tf
import os
from pathlib import Path

In [213]:
IMG_SIZE = 64  # must match training size
CHECKPOINT_DIR = 'checkpoints_utk'

E_files = sorted(Path(CHECKPOINT_DIR).glob('E_epoch*.keras'))
G_files = sorted(Path(CHECKPOINT_DIR).glob('G_epoch*.keras'))
if not E_files or not G_files:
    raise FileNotFoundError('Trained model checkpoints not found in checkpoints_utk')

E = tf.keras.models.load_model(str(E_files[-1]), compile=False)
G = tf.keras.models.load_model(str(G_files[-1]), compile=False)

from tensorflow.keras import layers, Model

def build_age_embed(age_embed_dim=64):
    age_in = layers.Input((1,))
    x = layers.Dense(64, activation="relu")(age_in)
    x = layers.Dense(age_embed_dim, activation="relu")(x)
    return Model(age_in, x, name="AgeEmbed")

A = build_age_embed()

def preprocess_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.io.decode_jpeg(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    img = tf.image.resize(img, (IMG_SIZE, IMG_SIZE), antialias=True)
    return img

def age_image(input_path: str, target_age: float, output_path: str = None,
              upscale_to: int = 256):
    # 1) Load & resize input to model size
    img = preprocess_image(input_path)  # this already resizes to IMG_SIZE (64 or 128)
    
    # 2) Normalize the target age
    age_norm = min(max(target_age, 0.0), 116.0) / 116.0
    
    # 3) Encode & generate
    z = E(tf.expand_dims(img, 0), training=False)
    a_emb = A(tf.expand_dims([age_norm], 0), training=False)
    out = G([z, a_emb], training=False)

    # 4) Remove batch dimension and clamp values
    out = tf.squeeze(out, 0)
    out = tf.clip_by_value(out, 0.0, 1.0)

    # ✅ 5) Upscale the generated image
    out_upscaled = tf.image.resize(
        out,
        (upscale_to, upscale_to),
        method='bicubic'
    )

    # 6) Save if path is given
    if output_path:
        out_uint8 = tf.image.convert_image_dtype(out_upscaled, tf.uint8)
        tf.io.write_file(output_path, tf.image.encode_jpeg(out_uint8))

    return out_upscaled


if __name__ == '__main__':
    input_image_path = '/kaggle/input/utkface-new/UTKFace/11_0_0_20170110224340941.jpg.chip.jpg'
    target_age = 30.0 
    output_image_path = '/kaggle/working/aged_face1.jpg'

    aged_face = age_image(input_image_path, target_age, output_image_path)
    print(f'Aged image saved to {output_image_path}')


Aged image saved to /kaggle/working/aged_face1.jpg
