In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2DTranspose, Conv2D, Input, UpSampling2D, MaxPooling2D, concatenate
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.initializers import orthogonal
from PIL import Image as im
import numpy as np

Image Generator

In [None]:
img_dir = "../input/energy-map-dataset/dataset/input"
out_dir= "../input/energy-map-dataset/dataset/output"
img_gen = ImageDataGenerator(rescale=1.0/255)
batch_size = 50
epochs = 10
img_datagen = img_gen.flow_from_directory(
    img_dir,
    target_size=(256, 256),
    batch_size=batch_size,
    class_mode=None,
    shuffle=False
)
out_gen = ImageDataGenerator(rescale=1.0/255)
out_datagen = out_gen.flow_from_directory(
    out_dir,
    target_size=(256, 256),
    batch_size=batch_size,
    class_mode=None,
    shuffle=False
)

In [None]:
class JoinedGen(tf.keras.utils.Sequence):
    def __init__(self, input_gen, target_gen):
        self.gen1 = input_gen
        self.gen2 = target_gen

        assert len(input_gen) == len(target_gen)

    def __len__(self):
        return len(self.gen1)

    def __getitem__(self, i):
        x = self.gen1[i]
        y = self.gen2[i]

        return x, y

    def on_epoch_end(self):
        self.gen1.on_epoch_end()
        self.gen2.on_epoch_end()
data_gen = JoinedGen(img_datagen, out_datagen)

Define Autoencoder model

In [None]:
class Autoencoder(keras.Model):
  def __init__(self):
    super(Autoencoder, self).__init__()
    self.encoder1 = Sequential([
        Input(shape=(256, 256, 3)),
        Conv2D(32, 3, padding='same', activation='relu',strides=1),
        Conv2D(32, 3, padding='same', activation='relu',strides=1),
        MaxPooling2D(),
    ])
    self.encoder2 = Sequential([
        Conv2D(64, 3, padding='same', activation='relu',strides=2),
        Conv2D(64, 3, padding='same', activation='relu',strides=1),
        MaxPooling2D(),
    ])
    self.encoder3 = Sequential([
        Conv2D(128, 3, padding='same', activation='relu',strides=2),
        Conv2D(128, 3, padding='same', activation='relu',strides=1),
        MaxPooling2D(),
    ])
    self.encoder4 = Sequential([
        Conv2D(256, 3, padding='same', activation='relu',strides=2),
        Conv2D(256, 3, padding='same', activation='relu',strides=1),
        MaxPooling2D(),
    ])
    self.encoder5 = Sequential([
        Conv2D(512, 3, padding='same', activation='relu',strides=2),
        Conv2D(512, 3, padding='same', activation='relu',strides=1)
    ])
    self.decoder1 = Sequential([
        UpSampling2D()
    ])
    self.decoder2 = Sequential([
        Conv2DTranspose(256, kernel_size=3, strides=1, activation='relu', padding='same'), 
        Conv2DTranspose(256, kernel_size=3, strides=2, activation='relu', padding='same'),
        UpSampling2D(),
    ])
    self.decoder3 = Sequential([
        Conv2DTranspose(128, kernel_size=3, strides=1, activation='relu', padding='same'), 
        Conv2DTranspose(128, kernel_size=3, strides=2, activation='relu', padding='same'),
        UpSampling2D(),
    ])
    self.decoder4 = Sequential([
        Conv2DTranspose(64, kernel_size=3, strides=1, activation='relu', padding='same'), 
        Conv2DTranspose(64, kernel_size=3, strides=2, activation='relu', padding='same'),
        UpSampling2D(),
    ])
    self.decoder5 = Sequential([
        Conv2DTranspose(32, kernel_size=3, strides=1, activation='relu', padding='same'), 
        Conv2DTranspose(32, kernel_size=3, strides=2, activation='relu', padding='same'),
        Conv2DTranspose(1,kernel_size=1, strides=1, padding='same', activation="sigmoid"),
    ])

  def call(self, x):
    encoded1 = self.encoder1(x)
    encoded2 = self.encoder2(encoded1)
    encoded3 = self.encoder3(encoded2)
    encoded4 = self.encoder4(encoded3)
    encoded5 = self.encoder5(encoded4)
    decoded1 = self.decoder1(encoded5)
    decoded2 = self.decoder2(concatenate([encoded4, decoded1]))
    decoded3 = self.decoder3(concatenate([encoded3, decoded2]))
    decoded4 = self.decoder4(concatenate([encoded2, decoded3]))
    decoded5 = self.decoder5(concatenate([encoded1, decoded4]))

    return decoded5


Build and Compile Autoencoder model

In [None]:
autoencoder = Autoencoder()
optimizer = tf.optimizers.Adam(learning_rate = 0.0001)
autoencoder.compile(optimizer=optimizer, loss=keras.losses.binary_crossentropy)

Train model

In [None]:
autoencoder.fit(data_gen,
    epochs=epochs,
    shuffle=True,
    batch_size=batch_size,
)

Save model

In [None]:
autoencoder.save("model_joined_generator")

Test model

In [None]:
test_image = keras.utils.load_img("../input/bsds500-custom/dataset/images/test/100007.jpg", target_size=(256, 256))
test_image = keras.utils.img_to_array(test_image)
test_image = tf.expand_dims(test_image, 0)
predictions = autoencoder.predict(test_image)
predictions = np.reshape(predictions, (256, 256)) * 255
print(predictions)
data = im.fromarray(predictions.astype(np.uint8))
data.save('energy_map.png')