In [2]:
import os
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.layers import UpSampling2D
from tensorflow.keras.optimizers import RMSprop, Adam
from tensorflow import keras
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

2024-06-07 17:44:17.072635: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-06-07 17:44:17.195802: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-06-07 17:44:17.195878: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-06-07 17:44:17.199688: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-06-07 17:44:17.224970: I tensorflow/core/platform/cpu_feature_guar

In [3]:
IMG_HEIGHT = IMG_WIDTH = 500

def load_and_preprocess_image(image_path):
    # img = keras.preprocessing.image.load_img(image_path, target_size = (IMG_HEIGHT, IMG_WIDTH))
    # arr = keras.preprocessing.image.img_to_array(img)
    # arr = np.expand_dims(arr, axis = 0)
    # arr = keras.applications.vgg16.preprocess_input(arr)
    # return tf.convert_to_tensor(arr)
    image = Image.open(image_path).convert('RGB') 
    image = image.resize((IMG_WIDTH, IMG_HEIGHT))
    image = np.array(image)
    image = tf.convert_to_tensor(image, dtype=tf.float32)
    image = tf.expand_dims(image, 0)  # Add batch dimension
    return image

content_image_path = 'contents/content-image-library.jpeg'
style_image_path = 'styles/starry_night.jpg'

content_image = load_and_preprocess_image(content_image_path)
style_image = load_and_preprocess_image(style_image_path)

2024-06-07 17:44:20.275729: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:887] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-06-07 17:44:20.532104: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:887] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-06-07 17:44:20.532152: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:887] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-06-07 17:44:20.536099: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:887] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-06-07 17:44:20.536155: I external/local_xla/xla/stream_executor

In [4]:
class AdaIN(layers.Layer):
    def __init__(self, epsilon=1e-5):
        super(AdaIN, self).__init__()
        self.epsilon = epsilon
        
    def call(self, inputs):
        x = inputs[0]  # content
        y = inputs[1]  # style
        mean_x, var_x = tf.nn.moments(x, axes=(1, 2), keepdims=True)
        mean_y, var_y = tf.nn.moments(y, axes=(1, 2), keepdims=True)
        std_x = tf.sqrt(var_x + self.epsilon)
        std_y = tf.sqrt(var_y + self.epsilon)
        output = std_y * (x - mean_x) / (std_x) + mean_y
        
        return output
    

class Conv2D(layers.Layer):
    def __init__(self, in_channels, out_channels, kernel=3, use_relu=True):
        super(Conv2D, self).__init__()
        self.kernel = kernel
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.use_relu = use_relu
        
    def build(self, input_shape):
        self.w = self.add_weight(shape=[self.kernel,
                                        self.kernel,
                                        self.in_channels,
                                        self.out_channels],
                                initializer='glorot_normal',
                                trainable=True, name='kernel')
        self.b = self.add_weight(shape=(self.out_channels,),
                                initializer='zeros',
                                trainable=True, name='bias')
        
    @tf.function
    def call(self, inputs):
        padded = tf.pad(inputs, [[0, 0], [1, 1], [1, 1], [0, 0]], mode='REFLECT')
        output = tf.nn.conv2d(padded, self.w, strides=1, padding="VALID") + self.b
        if self.use_relu:
            output = tf.nn.relu(output)
        return output
    

In [5]:
class ArbitraryStyleTransfer():
    def __init__(self, image_shape, load_path=''):
        self.image_shape = image_shape
        content_image_input = layers.Input(shape=image_shape, name='content_image')
        style_image_input = layers.Input(shape=image_shape, name='style_image')
        
        self.encoder = self.build_encoder(name='encoder')
        self.decoder = self.build_decoder()
        
        content_image = self.preprocess(content_image_input)
        style_image = self.preprocess(style_image_input)
        
        self.content_target = self.encoder(content_image)
        self.style_target = self.encoder(style_image)

        adain_output = AdaIN()([self.content_target[-1], self.style_target[-1]])
            
        self.stylized_image = self.postprocess(self.decoder(adain_output))
        
        self.stn = Model([content_image_input, style_image_input], self.stylized_image)
        
        output_features = self.encoder(self.preprocess(self.stylized_image))

        self.training_model = Model([content_image_input, style_image_input],
                                    [adain_output, output_features, self.style_target])
        
        initial_lr = 1e-4
        lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
            initial_lr, decay_steps=200, decay_rate=0.96, staircase=True
        )

        self.optimizer = Adam(learning_rate=lr_schedule)
        checkpoint_path = f"./checkpoints/arbitrary"
        self.ckpt = tf.train.Checkpoint(decoder=self.decoder)
        self.ckpt_manager = tf.train.CheckpointManager(self.ckpt, checkpoint_path, max_to_keep=40)
        if load_path:
            self.load_checkpoint(load_path)

    def deprocess_image(self, tensor):
        tensor = tensor.numpy()
        # print(tensor.shape)
        tensor = tensor.reshape((tensor.shape[1], tensor.shape[2], 3))

        # Remove zero-center by mean pixel
        # tensor[:, :, 0] += 103.939
        # tensor[:, :, 1] += 116.779
        # tensor[:, :, 2] += 123.680

        # 'BGR'->'RGB'
        # tensor = tensor[:, :, ::-1]
        # return np.clip(tensor, 0, 255).astype("uint8")
        return tensor

    def save_result(self, generated_image, name):
        img = self.deprocess_image(generated_image)
        tf.keras.preprocessing.image.save_img(name, img)
        # tf.keras.preprocessing.image.save_img(name, generated_image)

    def load_checkpoint(self, ckpt_path):
        self.ckpt.restore(ckpt_path)
        
    def preprocess(self, image):
        image = tf.reverse(image, axis=[-1])
        return tf.keras.applications.vgg19.preprocess_input(image)

    def postprocess(self, image):
        # tensor[:, :, 0] += 103.939
        # tensor[:, :, 1] += 116.779
        # tensor[:, :, 2] += 123.680
        # image = tf.reverse(image, axis=[-1])
        return tf.clip_by_value(image, 0., 255.)
    
    def build_encoder(self, name='encoder'):
        self.encoder_layers = ['block1_conv1', 'block2_conv1', 'block3_conv1', 'block4_conv1']
        
        vgg = tf.keras.applications.VGG19(include_top=False, weights='imagenet')
        layer_outputs = [vgg.get_layer(x).output for x in self.encoder_layers]
    
        return Model(vgg.input, layer_outputs, name=name)
        
    def build_decoder(self):
        block = tf.keras.Sequential([\
                Conv2D(512, 256, 3),
                UpSampling2D((2, 2)),
                Conv2D(256, 256, 3),
                Conv2D(256, 256, 3),
                Conv2D(256, 256, 3),
                Conv2D(256, 128, 3),
                UpSampling2D((2, 2)),
                Conv2D(128, 128, 3),
                Conv2D(128, 64, 3),
                UpSampling2D((2, 2)),
                Conv2D(64, 64, 3),
                Conv2D(64, 3, 3, use_relu=False)],
                                   name='decoder')
        return block
    
    def calc_style_loss(self, y_true, y_pred):
        n_features = len(y_true)
        epsilon = 1e-5
        loss = []

        for i in range(n_features):
            mean_true, var_true = tf.nn.moments(y_true[i], axes=(1, 2), keepdims=True)
            mean_pred, var_pred = tf.nn.moments(y_pred[i], axes=(1, 2), keepdims=True)
            std_true, std_pred = tf.sqrt(var_true + epsilon), tf.sqrt(var_pred + epsilon)
            mean_loss = tf.reduce_sum(tf.square(mean_true - mean_pred))
            std_loss = tf.reduce_sum(tf.square(std_true - std_pred))
            loss.append(mean_loss + std_loss)
            
        return tf.reduce_mean(loss)

    @tf.function
    def train_step(self, train_data):
        with tf.GradientTape() as tape:
            adain_output, output_features, style_target = self.training_model(train_data)

            content_loss = tf.reduce_sum((output_features[-1] - adain_output) ** 2)
            style_loss = self.style_weight * self.calc_style_loss(style_target, output_features)
            loss = content_loss + style_loss

            gradients = tape.gradient(loss, self.decoder.trainable_variables)

            self.optimizer.apply_gradients(zip(gradients, self.decoder.trainable_variables))

        return content_loss, style_loss, loss
    
    def train(self, steps, interval=100, style_weight=1e4, save_dir='./generated_images/library-starry'):
        self.style_weight = style_weight
        train_data = [content_image, style_image]

        stylized_images = content_image
        
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

        for i in range(steps):
            content_loss, style_loss, loss = self.train_step(train_data)
            print(f"iter: {i}, content_loss: {content_loss:.4f}, style_loss: {style_loss:.4f}, loss: {loss:.4f}")

            if i % interval == 0:
                ckpt_save_path = self.ckpt_manager.save()                
                stylized_images = self.stn(train_data)
                # self.plot_images(content_image, style_image, stylized_images, step=i, save_dir=save_dir)
                save_path = os.path.join(save_dir, f'stylized_image_step_{i}.png')
                self.save_result(stylized_images, save_path)
        self.plot_images(content_image, style_image, stylized_images, step=i, save_dir=save_dir)

    def plot_images(self, contents, styles, stylized, step, save_dir):
        f, axarr = plt.subplots(1, 3, figsize=(18, 6))
        
        axarr[0].imshow(tf.squeeze(styles) / 255.)
        axarr[0].axis('off')
        axarr[1].imshow(tf.squeeze(contents) / 255.)
        axarr[1].axis('off')
        axarr[2].imshow(tf.squeeze(stylized) / 255.)
        axarr[2].axis('off')

        plt.show()



model = ArbitraryStyleTransfer((IMG_WIDTH, IMG_HEIGHT, 3))

2024-06-07 17:44:24.971391: I external/local_tsl/tsl/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory


In [6]:
model.train(steps=3000, interval=100, style_weight=1e4)

2024-06-07 17:44:29.904055: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:454] Loaded cuDNN version 8907
2024-06-07 17:44:32.295884: I external/local_tsl/tsl/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory
2024-06-07 17:44:37.523198: W external/local_tsl/tsl/framework/bfc_allocator.cc:296] Allocator (GPU_0_bfc) ran out of memory trying to allocate 4.16GiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.
2024-06-07 17:44:37.814458: W external/local_tsl/tsl/framework/bfc_allocator.cc:296] Allocator (GPU_0_bfc) ran out of memory trying to allocate 4.16GiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.
2024-06-07 17:44:39.027149: W external/local_tsl/tsl/framework/bfc_allocator.cc:296] Allocator (GPU_0_bfc) ran o

iter: 0, content_loss: 2458418675712.0000, style_loss: 615853588480.0000, loss: 3074272264192.0000
iter: 1, content_loss: 2150617972736.0000, style_loss: 623698247680.0000, loss: 2774316089344.0000
iter: 2, content_loss: 1906575933440.0000, style_loss: 725494464512.0000, loss: 2632070463488.0000
iter: 3, content_loss: 1963600904192.0000, style_loss: 641717043200.0000, loss: 2605317881856.0000
iter: 4, content_loss: 2009641517056.0000, style_loss: 631962206208.0000, loss: 2641603592192.0000
iter: 5, content_loss: 1907191316480.0000, style_loss: 636521676800.0000, loss: 2543712993280.0000
iter: 6, content_loss: 1812051132416.0000, style_loss: 646345261056.0000, loss: 2458396393472.0000
iter: 7, content_loss: 1791647547392.0000, style_loss: 678275186688.0000, loss: 2469922865152.0000
iter: 8, content_loss: 1758311088128.0000, style_loss: 633238388736.0000, loss: 2391549411328.0000
iter: 9, content_loss: 1758766301184.0000, style_loss: 614933790720.0000, loss: 2373700026368.0000
iter: 10, 