In [1]:
import numpy as np
from PIL import Image
import time
import tensorflow as tf
import numpy as np
import imageio
from skimage import color
from abc import ABC, abstractmethod
import tensorflow_probability as tfp
#from google.colab import files

In [25]:
#Training Parameters
TOLERANCE = 1e-5
EPOCHS = 500

#Weights
LOSS_TYPE = 1
STYLE_WEIGHTS = [1e0]
CONTENT_WEIGHT = 0.05
TOTAL_VARIATION_WEIGHT = 8.5e-5

#Image
MAX_IMG_SIZE = 512
CONTENT_PATH = 'tubingen.jpg'
STYLE_PATHS = ['starry-night.jpg']
OUTPUT_PATH = './'

#Layers
STYLE_LAYERS = ['block1_conv1', 'block1_conv2', 'block2_conv1', 'block2_conv2', 'block3_conv1', 'block3_conv2', 'block3_conv3', 'block3_conv4',
                    'block4_conv1', 'block4_conv2', 'block4_conv3', 'block4_conv4', 'block5_conv1', 'block5_conv2', 'block5_conv3', 'block5_conv4']

CONTENT_LAYERS = ['block5_conv2']

In [3]:
def imresize(img, size, interp='bilinear'):
  """
  Used to resize images to a specific size

  Args:
      img (Image): image to resize
      size (tuple): size
      interp (str, optional): interpolation style. Defaults to 'bilinear'.

  Returns:
      _type_: _description_
  """
  if interp == 'bilinear':
      interpolation = Image.BILINEAR
  elif interp == 'bicubic':
      interpolation = Image.BICUBIC
  else:
      interpolation = Image.NEAREST

  size = (size[1], size[0])

  if type(img) != Image:
      img = Image.fromarray(img, mode='RGB')

  img = np.array(img.resize(size, interpolation))
  return img
    
def imsave(path, img):
  """
  Save an image

  Args:
      path (string): path where the image will be saved
      img (Image): image to be saved
  """
  imageio.imwrite(path, img)
  return

def load_img(path_to_img):
  """
  Load an image from a path, resize it to match the maximum dim and format it
  for training

  Args:
      path_to_img (string): path to the image

  Returns:
      numpy array: image as a np array
  """
  max_dim = MAX_IMG_SIZE
  img = tf.io.read_file(path_to_img)
  img = tf.image.decode_image(img, channels=3)
  img = tf.image.convert_image_dtype(img, tf.float32)
  
  img *= 255

  shape = tf.cast(tf.shape(img)[:-1], tf.float32)
  long_dim = max(shape)
  scale = max_dim / long_dim

  new_shape = tf.cast(shape * scale, tf.int32)

  img = tf.image.resize(img, new_shape)
  img = img[tf.newaxis, :]
  img = img.numpy()
  
  return img

def load_and_process_img(path_to_img):
  """
  Load and preprocess the image so it can be fed into the model

  Args:
      path_to_img (path): path to the image

  Returns:
      np array: preprocessed image
  """
  img = load_img(path_to_img)
  img = tf.keras.applications.vgg19.preprocess_input(img)
  return img

def deprocess_img(processed_img):
  """
  Undo the preprocessing on the image

  Args:
      processed_img (np array): image to be deprocessed

  Raises:
      ValueError: Invalid input to deprocessing image

  Returns:
      np array: deprocessed image
  """
  if len(processed_img.shape) == 4:
    processed_img = np.squeeze(processed_img, 0)
  assert len(processed_img.shape) == 3, ("Input to deprocess image must be an image of "
                             "dimension [1, height, width, channel] or [height, width, channel]")
  if len(processed_img.shape) != 3:
    raise ValueError("Invalid input to deprocessing image")
  
  # perform the inverse of the preprocessing step
  processed_img[:, :, 0] += 103.939
  processed_img[:, :, 1] += 116.779
  processed_img[:, :, 2] += 123.68
  processed_img = processed_img[:, :, ::-1]

  processed_img = np.clip(processed_img, 0, 255).astype('uint8')
  return processed_img

def clip_image(image):
  return tf.clip_by_value(image, clip_value_min=-128, clip_value_max=128.0)

In [18]:
# Ported from https://pychao.com/2019/11/02/optimize-tensorflow-keras-models-with-l-bfgs-from-tensorflow-probability/
class AbstractTFPOptimizer(ABC):

    def __init__(self, trace_function=False):
        super(AbstractTFPOptimizer, self).__init__()
        self.trace_function = trace_function
        self.callback_list = None

    def _function_wrapper(self, loss_func, model):
        """A factory to create a function required by tfp.optimizer.lbfgs_minimize.
        Args:
            loss_func: a function with signature loss_value = loss(model).
            model: an instance of `tf.keras.Model` or its subclasses.
        Returns:
            A function that has a signature of:
                loss_value, gradients = f(model_parameters).
        """

        # obtain the shapes of all trainable parameters in the model
        shapes = tf.shape_n(model.trainable_variables)
        n_tensors = len(shapes)

        # we'll use tf.dynamic_stitch and tf.dynamic_partition later, so we need to
        # prepare required information first
        count = 0
        idx = []  # stitch indices
        part = []  # partition indices

        for i, shape in enumerate(shapes):
            n = np.product(shape)
            idx.append(tf.reshape(tf.range(count, count + n, dtype=tf.int32), shape))
            part.extend([i] * n)
            count += n

        part = tf.constant(part)

        @tf.function
        def assign_new_model_parameters(params_1d):
            """A function updating the model's parameters with a 1D tf.Tensor.
            Args:
                params_1d [in]: a 1D tf.Tensor representing the model's trainable parameters.
            """

            params = tf.dynamic_partition(params_1d, part, n_tensors)
            for i, (shape, param) in enumerate(zip(shapes, params)):
                model.trainable_variables[i].assign(tf.reshape(param, shape))

        # now create a function that will be returned by this factory
        def f(params_1d):
            """A function that can be used by tfp.optimizer.lbfgs_minimize.
            This function is created by function_factory.
            Args:
               params_1d [in]: a 1D tf.Tensor.
            Returns:
                A scalar loss and the gradients w.r.t. the `params_1d`.
            """

            # use GradientTape so that we can calculate the gradient of loss w.r.t. parameters
            with tf.GradientTape() as tape:
                # update the parameters in the model
                assign_new_model_parameters(params_1d)
                # calculate the loss
                loss_value = loss_func(model)

            # calculate gradients and convert to 1D tf.Tensor
            grads = tape.gradient(loss_value, model.trainable_variables)
            grads = tf.dynamic_stitch(idx, grads)

            # print out iteration & loss
            f.iter.assign_add(1)
            tf.print("Iter:", f.iter, "loss:", loss_value)

            if self.callback_list is not None:
                info_dict = {
                    'iter': f.iter,
                    'loss': loss_value,
                    'grad': grads,
                }

                for callback in self.callback_list:
                    callback(model, info_dict=info_dict)

            return loss_value, grads

        if self.trace_function:
            f = tf.function(f)

        # store these information as members so we can use them outside the scope
        f.iter = tf.Variable(0, trainable=False)
        f.idx = idx
        f.part = part
        f.shapes = shapes
        f.assign_new_model_parameters = assign_new_model_parameters

        return f

    def register_callback(self, callable):
        """
        Accepts a callable with signature `callback(model, info_dict=None)`.
        Callable should not return anything, it will not be dealt with.
        `info_dict` will contain the following information:
            - Optimizer iteration number (key = 'iter')
            - Loss value (key = 'loss')
            - Grad value (key = 'grad')
        Args:
            callable: A callable function with the signature `callable(model, info_dict=None)`.
            See above for what info_dict can contain.
        """

        if self.callback_list is None:
            self.callback_list = []

        self.callback_list.append(callable)

    @abstractmethod
    def minimize(self, loss_func, model):
        pass


class BFGSOptimizer(AbstractTFPOptimizer):

    def __init__(self, max_iterations=50, tolerance=1e-8, bfgs_kwargs=None, trace_function=False):
        super(BFGSOptimizer, self).__init__(trace_function=trace_function)

        self.max_iterations = max_iterations
        self.tolerance = tolerance

        bfgs_kwargs = bfgs_kwargs or {}

        if 'max_iterations' in bfgs_kwargs.keys():
            del bfgs_kwargs['max_iterations']

        if 'tolerance' in bfgs_kwargs.keys():
            keys = [key for key in bfgs_kwargs.keys()
                    if 'tolerance' in key]
            for key in keys:
                del bfgs_kwargs[key]

        self.bfgs_kwargs = bfgs_kwargs

    def minimize(self, loss_func, model):
        optim_func = self._function_wrapper(loss_func, model)

        # convert initial model parameters to a 1D tf.Tensor
        init_params = tf.dynamic_stitch(optim_func.idx, model.trainable_variables)

        # train the model with BFGS solver
        results = tfp.optimizer.bfgs_minimize(
            value_and_gradients_function=optim_func, initial_position=init_params,
            max_iterations=self.max_iterations,
            tolerance=self.tolerance,
            x_tolerance=self.tolerance,
            f_relative_tolerance=self.tolerance,
            **self.bfgs_kwargs)

        # after training, the final optimized parameters are still in results.position
        # so we have to manually put them back to the model
        optim_func.assign_new_model_parameters(results.position)

        print("BFGS complete, and parameters updated !")
        return model


class LBFGSOptimizer(AbstractTFPOptimizer):

    def __init__(self, max_iterations=50, tolerance=1e-8, lbfgs_kwargs=None, trace_function=False):
        super(LBFGSOptimizer, self).__init__(trace_function=trace_function)

        self.max_iterations = max_iterations
        self.tolerance = tolerance

        lbfgs_kwargs = lbfgs_kwargs or {}

        if 'max_iterations' in lbfgs_kwargs.keys():
            del lbfgs_kwargs['max_iterations']

        if 'tolerance' in lbfgs_kwargs.keys():
            keys = [key for key in lbfgs_kwargs.keys()
                    if 'tolerance' in key]
            for key in keys:
                del lbfgs_kwargs[key]

        self.lbfgs_kwargs = lbfgs_kwargs

    def minimize(self, loss_func, model):
        optim_func = self._function_wrapper(loss_func, model)

        # convert initial model parameters to a 1D tf.Tensor
        init_params = tf.dynamic_stitch(optim_func.idx, model.trainable_variables)

        # train the model with L-BFGS solver
        results = tfp.optimizer.lbfgs_minimize(
            value_and_gradients_function=optim_func, initial_position=init_params,
            max_iterations=self.max_iterations,
            tolerance=self.tolerance,
            x_tolerance=self.tolerance,
            f_relative_tolerance=self.tolerance,
            **self.lbfgs_kwargs)

        # after training, the final optimized parameters are still in results.position
        # so we have to manually put them back to the model
        optim_func.assign_new_model_parameters(results.position)

        print("L-BFGS complete, and parameters updated !")
        return model

In [5]:
def gram_matrix(x):
    """
    the gram matrix of an image tensor (feature-wise outer product) using shifted activations

    Args:
        x (np array): image to apply gram matrix on

    Returns:
        np array: transformed image
    """
    gram = tf.linalg.einsum('bijc,bijd->bcd', x - 1, x - 1)
    return gram

class StyleContentModel(tf.keras.Model):
    """
    Allows to have the output of each layer for a specific  input
    """

    def __init__(self, style_layers, content_layers):
        super(StyleContentModel, self).__init__()

        # get the symbolic outputs of each "key" layer (we gave them unique names).
        transferL = tf.keras.applications.VGG19(include_top=False, weights='imagenet')
        transferL.trainable = False
        
        outputs_dict = dict([(layer.name, layer.output) for layer in transferL.layers])

        style_activations = [outputs_dict[layer_name] for layer_name in style_layers]
        content_activations = [outputs_dict[layer_name] for layer_name in content_layers]

        activations = style_activations + content_activations

        self.vgg = tf.keras.Model(transferL.input, activations)

        self.style_layer_names = style_layers
        self.content_layer_names = content_layers

        self.num_style_layers = len(style_layers)
        self.num_content_layers = len(content_layers)

    def call(self, inputs):

        outputs = self.vgg(inputs)
        style_outputs, content_outputs = (outputs[:self.num_style_layers],
                                          outputs[self.num_style_layers:])

        style_outputs = [gram_matrix(style_output)
                         for style_output in style_outputs]

        content_dict = {content_name: value
                        for content_name, value
                        in zip(self.content_layer_names, content_outputs)}

        style_dict = {style_name: value
                      for style_name, value
                      in zip(self.style_layer_names, style_outputs)}

        return {'content': content_dict, 'style': style_dict}
    
def style_loss(style, combination, size):
    """
    Compute the style loss

    Args:
        style (np array): target image
        combination (np array): trained image
        size (tuple): size of the image

    Returns:
        np array: loss value
    """
    channels = 3
    return tf.reduce_sum(tf.square(style - combination)) / (4. * (channels ** 2) * (size ** 2))

def content_loss(base, combination, size):
    """
    Compute content loss

    Args:
        base (np array): target image
        combination (np array): trained image
        size (tuple): size of the image

    Returns:
        np array: loss value
    """
    channels = 3
    if LOSS_TYPE == 1:
        multiplier = 1. / (2. * (channels ** 0.5) * (size ** 0.5))
    elif LOSS_TYPE == 2:
        multiplier = 1. / (channels * size)
    else:
        multiplier = 1.

    return multiplier * tf.reduce_sum(tf.square(combination - base))

# 
def total_variation_loss(x):
    """
    The total variation loss is designed to keep the generated image locally coherent
    by reducing high frequency artifacts

    Args:
        x (np array): input image

    Returns:
        np array: loss value
    """
    a = tf.square(
        x[:, :-1, :-1, :] - x[:, 1:, :-1, :]
    )
    b = tf.square(
        x[:, :-1, :-1, :] - x[:, :-1, 1:, :]
    )
    return tf.reduce_sum(tf.pow(a + b, 1.25))




def compute_loss(input, outputs, content_target, style_targets):
    """
    Compute the overall loss of the image

    Args:
        input (np array): trained image
        outputs (dict): outputs of each layer of the model with this input
        content_target (np array): self explanatory
        style_targets (np array): self explanatory

    Returns:
        np array: overall loss value
    """
    style_combined_outputs = outputs['style']
    content_combined_outputs = outputs['content']
    h,w,c = input.shape[1:]
    size = h*w

    # Content losses
    content_losses = CONTENT_WEIGHT * tf.add_n([content_loss(content_target[name], content_combined_outputs[name], size)
                                                            for name in content_combined_outputs.keys()])

    num_style_layers = len(STYLE_LAYERS)
    num_style_references = len(style_targets)

    # Style losses (Cross layer loss)
    style_losses = []
    for style_img_id in range(num_style_references):
        style_features = style_targets[style_img_id]

        sl_i = 0.
        for feature_layer_id in range(num_style_layers - 1):
            target_feature_layer = style_features[STYLE_LAYERS[feature_layer_id]]
            style_output = style_combined_outputs[STYLE_LAYERS[feature_layer_id]]

            sl1 = style_loss(target_feature_layer, style_output, size)

            target_feature_layer = style_features[STYLE_LAYERS[feature_layer_id + 1]]
            style_output = style_combined_outputs[STYLE_LAYERS[feature_layer_id + 1]]

            sl2 = style_loss(target_feature_layer, style_output, size)

            # Geometric loss scaling
            sl_i = sl_i + (sl1 - sl2) * (STYLE_WEIGHTS[style_img_id] / (2 ** (num_style_layers - 1 - (feature_layer_id + 1))))

        style_losses.append(sl_i)

    style_losses = tf.add_n(style_losses)

    # Total Variation Losses
    tv_losses = TOTAL_VARIATION_WEIGHT * total_variation_loss(input)

    return content_losses, style_losses, tv_losses

def get_feature_representations(model, content_path, style_paths):
  """Helper function to compute our content and style feature representations.

  This function will simply load and preprocess both the content and style 
  images from their path. Then it will feed them through the network to obtain
  the outputs of the intermediate layers. 
  
  Arguments:
    model: The model that we are using.
    content_path: The path to the content image.
    style_path: The path to the style image
    
  Returns:
    returns the style features and the content features. 
  """
  
  content_image = load_and_process_img(content_path)
  content_features = model(content_image)['content']
  
  style_features = []
  for path in style_paths: 
    style_image = load_and_process_img(path)
    style_features.append(model(style_image)['style'])

  return style_features, content_features

In [None]:
extractor = StyleContentModel(style_layers=STYLE_LAYERS, content_layers=CONTENT_LAYERS)

base_image = load_and_process_img(CONTENT_PATH)

style_targets_list, content_target = get_feature_representations(extractor, CONTENT_PATH, STYLE_PATHS)

x = tf.Variable(base_image, trainable=True)

class InputWrapper(tf.keras.Model):
    def __init__(self, x: tf.Variable):
        super(InputWrapper, self).__init__()

        self.x = x

        self.vgg = extractor
        self.vgg.trainable = False

    def call(self, inputs, training=None):
        outputs = self.vgg(self.x)
        return outputs

x_wrapper = InputWrapper(x)

@tf.function  # (tracing will be done by LBFGS for us)
def loss_wrapper(model):
    outputs = model(x)
    content_losses, style_losses, tv_losses = compute_loss(x, outputs, content_target, style_targets_list)
    loss = content_losses + style_losses + tv_losses
    return loss

prev_min_val = -1
start_time = time.time()


def save_image_callback(model, info_dict=None):
    """
    Callback function to get infos while training
    """
    global prev_min_val, start_time

    info_dict = info_dict or {}
    loss_value = info_dict.get('loss', None)
    i = info_dict.get('iter', -1)

    if loss_value is not None:
        loss_val = loss_value.numpy()

        if prev_min_val == -1:
            prev_min_val = loss_val

        improvement = (prev_min_val - loss_val) / prev_min_val * 100

        print("Current loss value:", loss_val, " Improvement : %0.3f" % improvement, "%")
        prev_min_val = loss_val

    last_save = info_dict.get('last_save', False)
    if (i + 1) % 100 == 0 or last_save:
        img = model.x.numpy()
        # save current generated image
        img = deprocess_img(img)

        if not last_save:
            fname = OUTPUT_PATH + "out" + "_at_iteration_%d.png" % (i + 1)
        else:
            fname = OUTPUT_PATH + "out" + "_final.png"

        imsave(fname, img)
        end_time = time.time()
        print("Image saved as", fname)
        print("Iteration %d completed in %ds" % (i + 1, end_time - start_time))


optimizer = LBFGSOptimizer(max_iterations=EPOCHS, tolerance=TOLERANCE)

# Add a save image callback to this
optimizer.register_callback(save_image_callback)

optimizer.minimize(loss_wrapper, x_wrapper)

save_image_callback(x_wrapper, info_dict={'last_save': True})