## Classifier for LDR evaluation

In [2]:
import os, json, scipy
os.environ["XLA_FLAGS"] = "--xla_gpu_cuda_data_dir=/usr/lib/cuda"
os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"
import tensorflow as tf

## Data

In [4]:
# Define some constants used throughout the script
INPUT_SHAPE = (256, 256, 3)
FILTERS = [64,128,256,512,512]
BATCH_SIZE = 16
LATENT_DIM = 254
PATH = "../../data/vangogh2photo/"

# Define a generator to yield images and its path as a label
def gen(path):
    for file in os.listdir(path):
        image = tf.io.read_file(os.path.join(path, file))
        image = tf.io.decode_image(image, dtype=tf.float32)
        yield image

# Define a function to randomly modify images
def random_jitter(image):
  # Expand and crop
  image = tf.image.resize(image, (286,286))
  image = tf.image.random_crop(image, size=INPUT_SHAPE)
  # Random mirroring
  image = tf.image.random_flip_left_right(image)
  return image

# Load training data of class A (note the mapping function adds a label)
train_A = tf.data.Dataset.from_generator(
    gen,
    args=[os.path.join(PATH, "trainA")],
    output_signature=tf.TensorSpec(shape=INPUT_SHAPE, dtype=tf.float32),
).cache().map(random_jitter).map(lambda x: (x, 0.)).shuffle(3, reshuffle_each_iteration=True).batch(BATCH_SIZE)
# Load training data of class B
train_B = tf.data.Dataset.from_generator(
    gen,
    args=[os.path.join(PATH, "trainB")],
    output_signature=tf.TensorSpec(shape=INPUT_SHAPE, dtype=tf.float32),
).cache().map(random_jitter).map(lambda x: (x, 1.)).shuffle(3, reshuffle_each_iteration=True).batch(BATCH_SIZE)
# Zip the training data
train = tf.data.Dataset.sample_from_datasets((train_A, train_B), stop_on_empty_dataset=True)

# Load test data of class A
test_A = tf.data.Dataset.from_generator(
    gen,
    args=[os.path.join(PATH, "testA")],
    output_signature=(tf.TensorSpec(shape=INPUT_SHAPE, dtype=tf.float32)),
).map(lambda x: (x, 0.)).batch(BATCH_SIZE)
# Load test data fo class B
test_B = tf.data.Dataset.from_generator(
    gen,
    args=[os.path.join(PATH, "testB")],
    output_signature=tf.TensorSpec(shape=INPUT_SHAPE, dtype=tf.float32),
).map(lambda x: (x, 1.)).batch(BATCH_SIZE)
# Concat the test data
test = tf.data.Dataset.sample_from_datasets((test_A, test_B), stop_on_empty_dataset=True)

## Build model

In [3]:
# Define a function that creates a Sequential model consisting of convolutions
def downsampler(nfilters, name=None, strides=(1,1), size=(12,12)):
    return tf.keras.Sequential([
        tf.keras.layers.Dropout(rate=.4),
        tf.keras.layers.Conv2D(filters=nfilters, kernel_size=size, padding="same", strides=strides),
        tf.keras.layers.GroupNormalization(groups=nfilters),
        tf.keras.layers.LeakyReLU()
    ], name=name)

# Define a function to return an inception module (defaults to downsampling unless `resize_shape` is given)
def inceptionv2(input_shape, nfilters, strides=(1,1), name=None):
    inputs = tf.keras.layers.Input(shape=input_shape, batch_size=BATCH_SIZE)
    # Separate into brach 1
    branch1 = downsampler(nfilters=nfilters//4, size=(1,1), strides=strides)(inputs)
    # Separate into brach 2
    branch2 = downsampler(nfilters=nfilters//4, size=(1,1), strides=(1,1))(inputs)
    branch2 = downsampler(nfilters=nfilters//4, size=(6,6), strides=strides)(branch2)
    # Separate into brach 3
    branch3 = downsampler(nfilters=nfilters//4, size=(1,1), strides=(1,1))(inputs)
    branch3 = downsampler(nfilters=nfilters//4, size=(12,12), strides=strides)(branch3)
    # Separate into brach 4
    branch4 = tf.keras.layers.MaxPooling2D(pool_size=(3,3), strides=(1,1), padding="same")(inputs)
    branch4 = downsampler(nfilters=nfilters//4, size=(1,1), strides=strides)(branch4)
    # Concatenate into desired dimensions
    outputs = tf.keras.layers.Concatenate()([branch1, branch2, branch3, branch4])
    return tf.keras.Model(inputs=inputs, outputs=outputs, name=name)

# Define a function to build the encoders
def build_encoder(name="Encoder"):
    # Get input
    inputs = tf.keras.layers.Input(shape=INPUT_SHAPE, batch_size=BATCH_SIZE, name="Input")
    x = inputs
    # Create some downsampling modules
    for i, nfilters in enumerate(FILTERS):
        # Create identity with specified number of filters
        x_ = downsampler(nfilters, size=(1,1), name=f"Downsampler_{i*2}")(x)
        # Pass through inception modules
        x = downsampler(nfilters=nfilters, size=(3,3), name=f"Downsampler_{i*2+1}")(x)
        x = inceptionv2(input_shape=x.shape[1:], nfilters=nfilters, name=f"Inception_{i}")(x)
        # Add identity and skipp layers
        x = tf.keras.layers.Add(name=f"SumSkips_{i}")([x_, x])
        # Reduce size
        x = tf.keras.layers.AveragePooling2D(pool_size=(2,2), name=f"AveragePooling_{i}")(x)
    # Generate prediction
    x = tf.keras.layers.Flatten(name="Flatten")(x)
    outputs = tf.keras.layers.Dense(1)(x)
    return tf.keras.Model(inputs=inputs, outputs=outputs, name=name)
model = build_encoder("Classifier")

## Train

In [4]:
model.compile(optimizer="adam", loss=tf.keras.losses.BinaryCrossentropy(from_logits=True))
history = model.fit(train, epochs=200)
model.save("classifier", save_format="keras")
json.dump(history.history, open("classifier_history.json", mode="w"))

Epoch 1/200
30/30 - 1094s - loss: 9.6147 - 1094s/epoch - 36s/step
Epoch 2/200
30/30 - 939s - loss: 2.2241 - 939s/epoch - 31s/step
Epoch 3/200
30/30 - 934s - loss: 2.3550 - 934s/epoch - 31s/step
Epoch 4/200
30/30 - 922s - loss: 2.2610 - 922s/epoch - 31s/step
Epoch 5/200
30/30 - 960s - loss: 2.0555 - 960s/epoch - 32s/step
Epoch 6/200
30/30 - 957s - loss: 1.6518 - 957s/epoch - 32s/step
Epoch 7/200
30/30 - 977s - loss: 1.3310 - 977s/epoch - 33s/step
Epoch 8/200
30/30 - 956s - loss: 1.4709 - 956s/epoch - 32s/step
Epoch 9/200
30/30 - 1000s - loss: 1.3588 - 1000s/epoch - 33s/step
Epoch 10/200
30/30 - 981s - loss: 1.2907 - 981s/epoch - 33s/step
Epoch 11/200
30/30 - 977s - loss: 1.2239 - 977s/epoch - 33s/step
Epoch 12/200
30/30 - 981s - loss: 1.2714 - 981s/epoch - 33s/step
Epoch 13/200
30/30 - 974s - loss: 1.0492 - 974s/epoch - 32s/step
Epoch 14/200
30/30 - 976s - loss: 0.9569 - 976s/epoch - 33s/step
Epoch 15/200
30/30 - 982s - loss: 0.8663 - 982s/epoch - 33s/step
Epoch 16/200
30/30 - 974s - lo

KeyboardInterrupt: 

## Self-evaluation

In [5]:
# Load model
model = tf.keras.models.load_model("./classifier")

# Create predictions
y_pred = model.predict(test)
y_pred = tf.cast(tf.clip_by_value(y_pred.ravel(), 0, 1), tf.int32)
# Get labels
y_true = []
for x, y in test:
    del x
    y_true.extend(list(y.numpy()))

# Calculate confusion matrix (tn, fp, fn, tp)
cm = [0,0,0,0]
for yh, y in zip(y_pred, y_true):
    if yh==y:
        if y==0:
            cm[0]+=1
        if y==1:
            cm[3]+=1
    elif y==0:
        cm[1]+=1
    elif y==1:
        cm[2]+=1
print(cm)

[30, 70, 0, 112]


## LDR-evaluation

In [16]:
# Load data
reconstructed_A = tf.data.Dataset.from_generator(
    gen,
    args=["./samples/reconstructed_A"],
    output_signature=tf.TensorSpec(shape=INPUT_SHAPE, dtype=tf.float32),
).batch(BATCH_SIZE)
reconstructed_B = tf.data.Dataset.from_generator(
    gen,
    args=["./samples/reconstructed_B"],
    output_signature=tf.TensorSpec(shape=INPUT_SHAPE, dtype=tf.float32),
).batch(BATCH_SIZE)
translated_A = tf.data.Dataset.from_generator(
    gen,
    args=["./samples/translated_A"],
    output_signature=tf.TensorSpec(shape=INPUT_SHAPE, dtype=tf.float32),
).batch(BATCH_SIZE)
translated_B = tf.data.Dataset.from_generator(
    gen,
    args=["./samples/translated_B"],
    output_signature=tf.TensorSpec(shape=INPUT_SHAPE, dtype=tf.float32),
).batch(BATCH_SIZE)

# Make predictions
reconstructed_A_pred = model.predict(reconstructed_A)
reconstructed_B_pred = model.predict(reconstructed_B)
translated_A_pred = model.predict(translated_A)
translated_B_pred = model.predict(translated_B)

print(scipy.stats.ttest_ind(reconstructed_A_pred, translated_A_pred, alternative="less"))
print(scipy.stats.ttest_ind(reconstructed_B_pred, translated_B_pred, alternative="greater"))

