In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import keras
import os
import pickle
import re


from PIL import Image

from typing import Union, List, Callable
from keras.initializers import RandomNormal
from keras.optimizers import Adam
from keras.models import Sequential, Model
from keras.layers import Input, Conv2D, Conv2DTranspose, LeakyReLU, Activation
from keras.layers import BatchNormalization, Concatenate, Dropout

2023-12-21 02:33:08.959712: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-12-21 02:33:09.177815: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-12-21 02:33:09.177855: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-12-21 02:33:09.246968: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-12-21 02:33:10.714740: W tensorflow/stream_executor/platform/de

In [None]:
! pip install rembg
from rembg import remove

Collecting rembg
  Downloading rembg-2.0.53-py3-none-any.whl (32 kB)
Collecting scikit-image
  Downloading scikit_image-0.22.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.7/14.7 MB[0m [31m42.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pooch
  Downloading pooch-1.8.0-py3-none-any.whl (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.7/62.7 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
Collecting onnxruntime
  Downloading onnxruntime-1.16.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.4/6.4 MB[0m [31m52.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pymatting
  Downloading PyMatting-1.1.12-py3-none-any.whl (52 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.0/53.0 kB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting opencv-python-headless
  Downl

In [None]:
data_path_ser = 'Data71/'
data_path = 'ImageData71'
image_resolution = (256, 256, 1)
step = 3

In [None]:
class Batch:
    def __init__(self, ident: str):
        self.input_images = list()
        self.expected_images = list()
        self.ident = ident

    def __str__(self) -> str:
       return f"Batch_{self.ident}: images:{self.images(len)}, expected:{self.expected(len)}"
    
    # images and expected functions return element of array from index or length of array
    def images(self, n: Union[int, Callable]) -> Union[tf.Tensor, int]:
      return len(self.input_images) if isinstance(n, type(len)) else self.input_images[min(abs(n), len(self.input_images) - 1) * (-1) ** (n < 0)]

    def expected(self, n: Union[int, Callable]) -> Union[tf.Tensor, int]:
      return len(self.expected_images) if isinstance(n, type(len)) else self.expected_images[min(abs(n), len(self.expected_images) - 1) * (-1) ** (n < 0)]

def load_image(image_source: str, remove_bg = False) -> tf.Tensor:
    if re.match("^(http|https):\/\/", image_source):
        image = Image.open(BytesIO(requests.get(image_source).content))
    else:
        image = Image.open(image_source)
        
    if remove_bg:
        image = remove(image)
        jpg_image = Image.new("RGB", image.size, (255, 255, 255))
        jpg_image.paste(image, (0, 0), image)
        image = jpg_image
    image = tf.convert_to_tensor(np.array(image))
    image = tf.image.resize(image, image_resolution[:2])
    image = tf.image.rgb_to_grayscale(image)
    image.set_shape(image_resolution)
    image /= 255.0
    return image

def load_real_dataset() -> list[Batch]:
    """
    Example of file name: ident_type_number
    Where Ident is unic Identifier of Batch
    Type is i (input) / e (expected)
    """

    batches = {}
    first_file_ident = lambda x: x.split("_")[0]

    def get_ident_with_batch(x: str) -> int:
        k = first_file_ident(x)
        if str(k) not in batches:
            batches[str(k)] = Batch(str(k))
        return k

    def get_number(x: str) -> int:
      return int(x.split("_")[2].split(".")[0])

    sorted_list = sorted(os.listdir(data_path),  key=lambda x: (get_ident_with_batch(x), get_number(x)))
    for filename in sorted_list:
        ident = first_file_ident(filename)
        batch = batches[str(ident)]
        if filename.startswith(f"{ident}_i"):
            batch.input_images.append(load_image(filename, remove_bg=True))
        elif filename.startswith(f"{ident}_e"):
            batch.expected_images.append(load_image(filename))
        batches[str(ident)] = batch
    return list(batches.values())

def load_batches(load_base=True) -> list[Batch]:
    if not os.path.exists(data_path_ser) and load_base:
        os.makedirs(data_path_ser)
        dataset = load_real_dataset()
        for i in range(len(dataset)):
            with open(f'{data_path_ser}serialized_batch{i}.txt', 'wb') as file:
                file.write(pickle.dumps(dataset[i]))

    batches = []
    for filename in os.listdir(data_path_ser):
        with open(data_path_ser+filename, 'rb') as file:
            loaded_class = pickle.loads(file.read())
            batches.append(loaded_class)
    return batches

batches = load_batches(False)


In [None]:
class GANContainer:
    def __init__(self, step, dataset):
        self._img_shape = (256, 256, 1)
        self._step = step
        self.dataset = dataset
        self.conv2d_kernel_init = RandomNormal(stddev=0.02)
        
        self._gen = self.__built_generator()
        self._dis = self.__built_discriminator()
        self._gan = self.__built_gan()

        self._dis.summary()
        
        opt = Adam(lr=0.0002, beta_1=0.5)
        losses = ['binary_crossentropy', 'mae']
        self._dis.compile(loss=losses[0], optimizer=opt, loss_weights=[0.5])
        self._gan.compile(loss=losses, optimizer=opt, loss_weights=[1, 100])
        
    def __built_discriminator(self):
        in_src = Input(shape=self._img_shape)
        in_target = Input(shape=self._img_shape)
        merged = Concatenate()([in_src, in_target])

        d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=self.conv2d_kernel_init)(merged)
        d = LeakyReLU(alpha=0.2)(d)

        features_strides = [(128, 2), (256, 2), (512, 2), (512, 1)]
        for feature, stride in features_strides:
            d = Conv2D(feature, 4, stride, padding='same', kernel_initializer=self.conv2d_kernel_init)(d)
            d = BatchNormalization()(d)
            d = LeakyReLU(alpha=0.2)(d)

        d = Conv2D(1, (4,4), padding='same', kernel_initializer=self.conv2d_kernel_init)(d)
        patch_out = Activation('sigmoid')(d)

        model = Model([in_src, in_target], patch_out)
        return model

    def __encoder_block(self, layer_in, n_filters, batchnorm=True):
        g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=self.conv2d_kernel_init)(layer_in)
        if batchnorm:
            g = BatchNormalization()(g, training=True)
        g = LeakyReLU(alpha=0.2)(g)
        return g

    def __decoder_block(self, layer_in, skip_in, n_filters, dropout=True):
        g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=self.conv2d_kernel_init)(layer_in)
        g = BatchNormalization()(g, training=True)
        if dropout:
            g = Dropout(0.5)(g, training=True)
        g = Concatenate()([g, skip_in])
        g = Activation('relu')(g)
        return g

    def __built_generator(self):
        in_image = Input(shape=self._img_shape)

        e1 = self.__encoder_block(in_image, 64, batchnorm=False)
        e2 = self.__encoder_block(e1, 128)
        e3 = self.__encoder_block(e2, 256)
        e4 = self.__encoder_block(e3, 512)
        e5 = self.__encoder_block(e4, 512)
        e6 = self.__encoder_block(e5, 512)
        e7 = self.__encoder_block(e6, 512)

        b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=self.conv2d_kernel_init)(e7)
        b = Activation('relu')(b)

        d1 = self.__decoder_block(b, e7, 512)
        d2 = self.__decoder_block(d1, e6, 512)
        d3 = self.__decoder_block(d2, e5, 512)
        d4 = self.__decoder_block(d3, e4, 512, dropout=False)
        d5 = self.__decoder_block(d4, e3, 256, dropout=False)
        d6 = self.__decoder_block(d5, e2, 128, dropout=False)
        d7 = self.__decoder_block(d6, e1, 64, dropout=False)

        g = Conv2DTranspose(self._img_shape[2], (4,4), strides=(2,2), padding='same', kernel_initializer=self.conv2d_kernel_init)(d7) #Modified 
        out_image = Activation('tanh')(g)
        model = Model(in_image, out_image)
        return model

    def __built_gan(self):
        for layer in self._dis.layers:
            if not isinstance(layer, BatchNormalization):
                layer.trainable = False 

        in_src = Input(shape=self._img_shape)
        gen_out = self._gen(in_src)
        dis_out = self._dis([in_src, gen_out])
        model = Model(in_src, [dis_out, gen_out])
        return model

    def __save_actual_result(self, tensor, name, folder='result'):
        keras.utils.array_to_img(tensor).save(f'{folder}/{self._step}s{name}e.jpg')
            
    def train(self, epochs=501, save_interval=50):
        patch_shape = self._dis.output_shape[1]
        valid = np.ones((1, patch_shape, patch_shape, 1))
        fake = np.zeros((1, patch_shape, patch_shape, 1))
        for epoch in range(epochs):
            batch = np.random.choice(self.dataset)
            
            img_in = tf.expand_dims(batch.images(0), axis=0) 
            img_exp = tf.expand_dims(batch.expected(self._step), axis=0) 

            gen_img = self._gen.predict(img_in)
 
            d_loss_gen_img = self._dis.train_on_batch([img_in, gen_img], fake)
            d_loss_img_in = self._dis.train_on_batch([img_in, img_in], fake)
            d_loss_img_exp = self._dis.train_on_batch([img_in, img_exp], valid)
            d_loss_in_out = self._dis.train_on_batch([img_in, img_in * img_exp], fake)

            g_loss, _, _ = self._gan.train_on_batch(img_in, [valid, img_exp])
        
            print(f"[{epoch}/{epochs}], G_loss:{g_loss}, D_g_img:{d_loss_gen_img}, D_g_in:{d_loss_img_in}, D_exp:{d_loss_img_exp}, D_mult:{d_loss_in_out}")

            if epoch % save_interval == 0:
                self.__save_actual_result(gen_img[0,:,:,:], str(epoch))
                
    def save_models(self, folder=None):
        if folder is None:
            folder=str(self._step)+'/'
        self._gen.save(f'{folder}gen')
        self._dis.save(f'{folder}dis')

In [None]:
gan_cont = GANContainer(step, batches)

Model: "model_4"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_6 (InputLayer)           [(None, 256, 256, 1  0           []                               
                                )]                                                                
                                                                                                  
 input_7 (InputLayer)           [(None, 256, 256, 1  0           []                               
                                )]                                                                
                                                                                                  
 concatenate_15 (Concatenate)   (None, 256, 256, 2)  0           ['input_6[0][0]',                
                                                                  'input_7[0][0]']          

In [None]:
gan_cont.train(epochs=11, save_interval=5)
gan_cont.save_models()

[0/11], G_loss:90.68773651123047, D_g_img:0.534294605255127, D_g_in:0.5527071952819824, D_exp:0.2990652918815613, D_mult:0.5459594130516052


FileNotFoundError: [Errno 2] No such file or directory: 'result/3s0e.jpg'

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=e1e341d8-6db6-4ef5-9a6a-7226f58f39f3' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>