In [1]:
import matplotlib.pyplot as plt
import numpy as np
import time

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf

import io
from datetime import datetime

from sklearn.model_selection import train_test_split
from scipy.stats.qmc import Sobol, Halton

In [2]:
# Config constants
SEED = 42
DATA_SIZE = 2048
BATCH_SIZE = 64
EPOCHS = 1500
LEARNING_RATE = 0.0002

EXPERIMENT_NAME = f"{datetime.now().strftime('%Y%m%d-%H%M%S')}"
EXPERIMENT_DIR = f"./logs/{EXPERIMENT_NAME}"
SAMPLE_SAVE_INTERVAL = 25

In [3]:
tf.random.set_seed(SEED)
np.random.seed(SEED)
rand_gen = Sobol(1, seed=SEED)

train_summary_writer = tf.summary.create_file_writer(EXPERIMENT_DIR)

In [4]:
def plot_to_image(fig):
    buf = io.BytesIO()
    fig.savefig(buf, format='png')
    buf.seek(0)
    return tf.image.decode_png(buf.getvalue(), channels=4)

In [5]:
def prepare_data():
    X = np.linspace(start=0, stop=1, num=int(DATA_SIZE / 0.8), dtype='float32')
    y = np.sin(10 * X) + np.sin(15 * X)
    #y = np.exp(-((X - 0.25) / 0.05) ** 2) + 0.5 * np.exp(-((X - 0.75) / 0.1) ** 2)
    #y = np.sin(10 * X)
    #y = np.exp( - 1 / 5 * X) * np.sin(X)

    X = np.array([np.array([val]) for val in X])
    y = np.array([np.array([val]) for val in y])

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=SEED
    )
    data = tf.concat([X_train, y_train], axis=1)
    dataset = tf.data.Dataset.from_tensor_slices(data).batch(BATCH_SIZE)

    return dataset, X_test, y_test, X, y

In [6]:
def build_generator():
    model = tf.keras.Sequential()

    model.add(tf.keras.layers.Dense(32, input_shape=(1,)))
    model.add(tf.keras.layers.LeakyReLU())

    model.add(tf.keras.layers.Dense(32))
    model.add(tf.keras.layers.LeakyReLU())

    model.add(tf.keras.layers.Dense(32))
    model.add(tf.keras.layers.LeakyReLU())

    model.add(tf.keras.layers.Dense(1))

    return model

def build_discriminator():
    model = tf.keras.Sequential()

    model.add(tf.keras.layers.Dense(32, input_shape=(2,)))
    model.add(tf.keras.layers.LeakyReLU())

    model.add(tf.keras.layers.Dense(32))
    model.add(tf.keras.layers.LeakyReLU())

    model.add(tf.keras.layers.Dense(32))
    model.add(tf.keras.layers.LeakyReLU())

    model.add(tf.keras.layers.Dense(1))

    return model

In [7]:
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)

    return real_loss + fake_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [8]:
generator_optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE, beta_1=0.7)
discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE, beta_1=0.7)

In [9]:
@tf.function
def train_step(generator, discriminator, batch):
    generated_x = tf.random.uniform(shape=(BATCH_SIZE, 1))
    # generated_x = rand_gen.random(n=BATCH_SIZE)

    with tf.GradientTape() as disc_tape:
        generated_y = generator(generated_x, training=False)

        real_data = batch
        fake_data = tf.concat([generated_x, generated_y], axis=1)

        combined_data = tf.concat([real_data, fake_data], axis=0)
        combined_labels = tf.concat(
            [tf.ones((BATCH_SIZE, 1)), tf.zeros((BATCH_SIZE, 1))], axis=0
        )

        predictions = discriminator(combined_data, training=True)
        disc_loss = cross_entropy(combined_labels, predictions)

    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

    with tf.GradientTape() as gen_tape:
        generated_y = generator(generated_x, training=True)

        fake_output = discriminator(tf.concat([generated_x, generated_y], axis=1), training=False)

        gen_loss = generator_loss(fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))

    return gen_loss, disc_loss

In [10]:
def save_model_if_best(generator, avg_mae):
    """Save the generator model if the MAE improves."""
    generator.save(os.path.join(EXPERIMENT_DIR, "best_generator_model.keras"))
    print(f"New best MAE {avg_mae:.4f}. Model saved!")
    return avg_mae

In [11]:
def visualize_samples(generator, X_test, y_test, epoch):
    """Visualize and log samples to TensorBoard."""
    indices = np.random.choice(np.arange(len(X_test)), size=100, replace=False)
    x_real, y_real = X_test[indices], y_test[indices]
    y_gen = generator.predict(x_real)

    fig = plt.figure(figsize=(8, 6))

    plt.scatter(x_real, y_real, label="True Output", color="blue")
    plt.scatter(x_real, y_gen, label="Generator Output", color="red")

    plt.xlabel("x")
    plt.ylabel("y")

    plt.legend()
    plt.title(f"Epoch {epoch + 1}")

    plt.axis('equal')

    with train_summary_writer.as_default():
        tf.summary.image("Sample Output", [plot_to_image(fig)], step=epoch + 1)

    plt.close()

In [12]:
def evaluate_best_generator(generator_path, X, y, step, writer):
    """Evaluate the best generator on the entire dataset and log the results."""
    best_generator = tf.keras.models.load_model(generator_path)

    y_gen = best_generator(X, training=False)

    mae = tf.keras.metrics.MeanAbsoluteError()
    mae.update_state(y, y_gen)
    res_mae = mae.result()

    mse = tf.keras.metrics.MeanSquaredError()
    mse.update_state(y, y_gen)
    res_mse = mse.result()

    fig = plt.figure(figsize=(8, 6))

    plt.scatter(X, y, label="True Output", color="blue", alpha=0.5, s=10)
    plt.scatter(X, y_gen, label="Generator Output", color="red", alpha=0.5, s=10)

    plt.xlabel("x", fontsize=12)
    plt.ylabel("y", fontsize=12)

    plt.legend(fontsize=10)
    plt.title(f"Best Generator Approximation\nMAE = {res_mae:.5f}, MSE = {res_mse:.5f}")

    plt.axis('equal')

    with writer.as_default():
        tf.summary.image("Full Dataset Approximation", [plot_to_image(fig)], step=step)

    plt.close(fig)

    return mae

In [13]:
def train():
    dataset, X_test, y_test, X, y = prepare_data()
    generator = build_generator()
    discriminator = build_discriminator()

    best_mae = float('inf')

    for epoch in range(EPOCHS):
        start = time.time()

        epoch_gen_loss = tf.keras.metrics.Mean()
        epoch_disc_loss = tf.keras.metrics.Mean()
        epoch_mae = tf.keras.metrics.MeanAbsoluteError()
        epoch_mse = tf.keras.metrics.MeanSquaredError()

        for batch in dataset:
            gen_loss, disc_loss = train_step(generator, discriminator, batch)
            epoch_gen_loss.update_state(gen_loss)
            epoch_disc_loss.update_state(disc_loss)

        y_gen = generator(X_test, training=False)

        epoch_mae.update_state(y_test, y_gen)
        avg_mae = epoch_mae.result()

        epoch_mse.update_state(y_test, y_gen)
        avg_mse = epoch_mse.result()

        if train_summary_writer:
            with train_summary_writer.as_default():
                tf.summary.scalar('Generator Loss', epoch_gen_loss.result(), step=epoch + 1)
                tf.summary.scalar('Discriminator Loss', epoch_disc_loss.result(), step=epoch + 1)
                tf.summary.scalar('Generator MAE', avg_mae, step=epoch + 1)
                tf.summary.scalar('Generator MSE', avg_mse, step=epoch + 1)

        if avg_mae < best_mae:
            save_model_if_best(generator, avg_mae)
            best_mae = avg_mae

            gen_path = os.path.join(EXPERIMENT_DIR, "best_generator_model.keras")
            evaluate_best_generator(gen_path, X, y, step=epoch, writer=train_summary_writer)

        print(f"Epoch {epoch+1}, Gen Loss: {epoch_gen_loss.result():.4f}, "
              f"Disc Loss: {epoch_disc_loss.result():.4f}, MAE: {avg_mae:.5f}, "
              f"Time: {time.time() - start:.2f} sec")

        if (epoch + 1) % SAMPLE_SAVE_INTERVAL == 0:
            visualize_samples(generator, X_test, y_test, epoch)

    return X, y

In [None]:
X, y = train()

gen_path = os.path.join(EXPERIMENT_DIR, "best_generator_model.keras")
evaluate_best_generator(gen_path, X, y, step=EPOCHS, writer=train_summary_writer)