## 1 Image-based Style Transfer

Loading the dataset of choice.

In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
import tensorflow as tf
import keras

# List all physical GPU devices
gpus = tf.config.list_physical_devices('GPU')

In [2]:
print("TensorFlow version:", tf.__version__)

TensorFlow version: 2.19.0


In [4]:
print("CUDA Available:", tf.config.list_physical_devices('GPU'))
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

CUDA Available: []
Num GPUs Available:  0


In [5]:
cpu_devices = tf.config.list_physical_devices('CPU')
print("Available CPUs:", cpu_devices)

Available CPUs: [PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]


In [6]:
GPU_in_use: int = 0
CPU_in_use: int = 0

In [7]:
from helper_functions.device_helper import get_device

In [8]:
import tensorflow as tf

gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print(gpus[GPU_in_use].name)
else:
    print("No GPU found")

No GPU found


In [9]:
import tensorflow as tf
import keras

base_image_path = "../demo_images/san.png"
style_reference_image_paths = ["../demo_images/starry_night.png"]
style_reference_path = style_reference_image_paths[0]

In [10]:
original_width, original_height = keras.utils.load_img(base_image_path).size
img_height = 400
img_width = round(original_width * img_height / original_height) 

In [14]:
from helper_functions.helper import  preprocess_image, deprocess_image
from helper_functions.loss_functions import style_loss, content_loss, total_variation_loss

In [None]:
total_variation_weight = 1e-6
single_style_weight = 1e-6
single_content_weight = 2.5e-8

In [None]:
from shared_utils.gatys_network import get_content_layer_names,get_style_layer_names,get_style_weights, get_content_weights

In [None]:
chosen_loss_network : str = "mobilenet"

In [None]:
style_layer_names = [
 "block1_conv1",
 "block2_conv1",
 "block3_conv1",
 "block4_conv1",
 "block5_conv1",
]
content_layer_names = ["block5_conv2"]
style_weights = {'block1_conv1': 1.,
                 'block2_conv1': 0.8,
                 'block3_conv1': 0.5,
                 'block4_conv1': 0.3,
                 'block5_conv1': 0.1}
content_weights = {'block5_conv2': 1e-6}
use_custom : bool = True
if use_custom:
    style_layer_names = get_style_layer_names(chosen_loss_network)
    content_layer_names = get_content_layer_names(chosen_loss_network)
    style_weights = get_style_weights(chosen_loss_network)
    content_weights = get_content_weights(chosen_loss_network)

In [None]:
from shared_utils.helper import create_dir

In [None]:
from shared_utils.network import get_model_for_loss_net

In [None]:
def get_model(model_name : str = "vgg19",img_width : int = 224,img_height : int = 224):
  """ Creates our model with access to intermediate layers. 
  
  This function will load the VGG19 model and access the intermediate layers. 
  These layers will then be used to create a new model that will take input image
  and return the outputs from these intermediate layers from the VGG model. 
  
  Returns:
    returns a keras model that takes image inputs and outputs the style and 
      content intermediate layers. 
  """
  # Load our model. We load pretrained VGG, trained on imagenet data (weights=’imagenet’)
  vgg = get_model_for_loss_net(model_name,image_size=(img_width,img_height))
  vgg.trainable = False
  use_model_layers = False
  # Get output layers corresponding to style and content layers 
  if use_model_layers:
     model_outputs = dict([(layer.name, layer.output) for layer in vgg.layers])
  else:
    style_outputs = [vgg.get_layer(name).output for name in style_layer_names]
    content_outputs = [vgg.get_layer(name).output for name in content_layer_names]
    model_outputs = style_outputs + content_outputs
  # Build model 

  return keras.Model(vgg.input, model_outputs)

In [None]:
feature_extractor = get_model(chosen_loss_network,img_width,img_height)

In [None]:
def get_feature_representations():
    # Get the style and content feature representations
    base_image = preprocess_image(base_image_path, img_height, img_width)
    style_reference_images = [preprocess_image(img, img_height, img_width) for img in style_reference_image_paths]
    
    # Compute the feature representations for the base image
    base_image_features = feature_extractor(base_image)
    
    # Compute the feature representations for the style reference images
    style_reference_features = [feature_extractor(style_reference_image) for style_reference_image in style_reference_images]
    
    return base_image_features, style_reference_features

In [None]:
from helper_functions.loss_functions import ssim_loss, psnr_loss

In [None]:
from typing import Optional
import lpips
def compute_custom_losses(combination_image, base_image,custom_losses : bool = True, loss_net = "alex", includes : list[str] = ["ssim", "psnr", "lpips"],weights : dict = {}):
    if custom_losses:    
        
        ssim_weight = weights.get("ssim", 1.0)
        psnr_weight = weights.get("psnr", 1.0)
        lpips_weight = weights.get("lpips", 1.0)
        loss = 0.0
        if "ssim" in includes:
            ssim_loss_value = ssim_loss(combination_image, base_image)
            loss += ssim_loss_value * ssim_weight
        if "psnr" in includes:
            psnr_loss_value = psnr_loss(combination_image, base_image)
            loss += psnr_loss_value * psnr_weight 
        if "lpips" in includes:
            lpips_loss_fn = lpips.LPIPS(net=loss_net)
            lpips_loss = lpips_loss_fn(base_image, combination_image)
            loss += lpips_loss * lpips_weight
        return loss
    return 0.0

In [None]:

def compute_loss(combination_image, base_image, style_reference_image,use_l2=False):
  input_tensor = tf.concat(
  [base_image, style_reference_image, combination_image], axis=0)
  features = feature_extractor(input_tensor)
  loss = tf.zeros(shape=())

  content_weight_per_layer : float = single_content_weight / len(content_layer_names)
  for layer_name in content_layer_names:
    layer_features = features[layer_name]
    base_image_features = layer_features[0, :, :, :]
    combination_features = layer_features[2, :, :, :]
    loss += content_weight_per_layer * content_loss(
        base_image_features, combination_features
    )
  style_weight_per_layer : float = single_style_weight / len(style_layer_names)
  for layer_name in style_layer_names:
    layer_features = features[layer_name]
    style_reference_features = layer_features[1, :, :, :]
    combination_features = layer_features[2, :, :, :]
    style_loss_value = style_loss(
    style_reference_features, combination_features, img_height, img_width)
    loss += style_weight_per_layer * style_loss_value

  custom_loss_weights = {
      "ssim": 1.0,
      "psnr": 1.0,
      "lpips": 1.0,}
  loss += compute_custom_losses(combination_image, base_image,weights=custom_loss_weights)


  loss += total_variation_weight * total_variation_loss(combination_image,use_l2=use_l2)
  return loss

In [None]:
from tensorflow.keras import layers

Set the policy.

In [None]:
from tensorflow.keras.mixed_precision import set_global_policy
def control_policy(enable_mixed_precision: bool = False):
    if enable_mixed_precision:
        print("Enabled mixed_float16 policy")
        set_global_policy('mixed_float16')

Call the policy function.

In [None]:
control_policy(enable_mixed_precision=False)

In [None]:
def normalization_grads(grads, strength= None) -> list:
    norm = tf.linalg.global_norm(grads)
    if strength:
        norm_grads : list = [g * (strength / (norm + 1e-8)) for g in grads]
    else:
        norm_grads : list = [g / (norm + 1e-8) for g in grads]
    return norm_grads

In [None]:
import tensorflow as tf

@tf.function
def compute_loss_and_grads(combination_image, base_image, style_reference_images,apply_normalization=False,strength=None, use_multi_NST = False):
    with get_device(GPU_in_use, CPU_in_use):  
        with tf.GradientTape() as tape:
            
            loss = tf.zeros(shape=())
            num : int = len(style_reference_images)
            style_cal = single_style_weight / num
            # iterate through the style images
            for i, style_reference_image in enumerate(style_reference_images):
                style_loss_value = compute_loss(
                    combination_image, base_image, style_reference_image
                )
                loss += style_cal * style_loss_value
        
        grads = tape.gradient(loss, combination_image)
        if apply_normalization:
            grads = normalization_grads(grads)
        return loss, grads

In [None]:
def preprocess_style_image(style_reference_image_paths):
    images = []
    for path in style_reference_image_paths:
        img = preprocess_image(path, img_height, img_width)
        images.append(img)
    return tf.concat(images, axis=0)
  

In [None]:
def add_noise_to_image(image,noise_strength : float =0.1):
    noise = tf.random.normal(shape=tf.shape(image), mean=0.0, stddev=noise_strength, dtype=image.dtype)
    noisy_image = image + noise
    return tf.clip_by_value(noisy_image, 0.0, 255.0)

In [None]:
def preprocess_NST_images(base_image_path : str, style_reference_image_path : str):
    with get_device(GPU_in_use, CPU_in_use):
        base_image = preprocess_image(base_image_path, img_height, img_width)
        style_reference_images = preprocess_image(style_reference_image_path, img_height, img_width)
        initial_combination_image = add_noise_to_image(base_image)
        combination_image = tf.Variable(initial_combination_image)
    return base_image, style_reference_images, combination_image


In [None]:
import math
import os
import time
import numpy as np

In [None]:
import pyJoules
import GPUtil
import psutil
from datetime import datetime
from shared_utils.device import get_gpu_usage

In [None]:
from shared_utils.optimizer import get_optimizer

In [None]:
from helper_functions.checkPointManager import checkPointManager

In [None]:
def result_save(content_name : str,style_name: str,iterations : int, img: np.ndarray,verbose: int = 0):
    now = datetime.now()
    time_format = "%Y%m%d_%H%M%S"
    now = now.strftime(time_format)
    fname = f"images/{content_name}_{style_name}_{now}_combination_image_at_iteration_{iterations}.png"
    keras.utils.save_img(fname, img) 
    if verbose > 0:
        print("Image saved at iteration {}".format(iterations))

In [None]:
def clip_0_1(image, min : float = 0.0, max : float = 1.0):
  return tf.clip_by_value(image, clip_value_min=min, clip_value_max=max)

In [None]:
@tf.function
def apply_style_transfer_step(combination_image, base_image, style_reference_image, optimizer, clip_image : bool = True):

    with get_device(GPU_in_use, CPU_in_use):
        loss, grads = compute_loss_and_grads(
            combination_image, base_image, style_reference_image
        )
    optimizer.apply_gradients([(grads, combination_image)])
    if clip_image:
        combination_image.assign(clip_0_1(combination_image))
    return loss, grads

In [None]:

from psutil import disk_usage


def training_loop(base_image, style_reference_image, combination_image,content_name : str,style_name: str,verbose : int = 0,include_checkpoints : bool = False, chosen_optimizer : list[str] | str = "adam", learning_rate : float = 0.01, improvement_threshold : float = 0.5, image_width : int = 400, image_height : int = 400):
    
    if isinstance(chosen_optimizer, str):
        optimizer = get_optimizer(chosen_optimizer, learning_rate=learning_rate)
    else:
        raise ValueError("Invalid passed in optimizer type. Should be a string or a list of strings.")
    checkpoint = None
    generated_images = []
    start_step : int = 1
    iterations = 1000
    check_step: int = 100
    folder_path = "images"
    best_cost = math.inf
    best_image = None
    checkpoint_dir = "./checkpoints"
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
    if include_checkpoints:
        checkpoint = tf.train.Checkpoint(optimizer=optimizer, combination_image=combination_image)
        create_dir(folder_path)
        create_dir(checkpoint_dir)
       
        checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
    total_wall_time = time.time() 
    total_time_cpu = time.process_time()
    start_time_cpu = time.process_time()
    start_time_wall = time.time()

    gpu_usage_list = []
    ram_usage_list = []
    disk_usage_list = []
    cpu_usage_list = []
    cpu_duration_logs = []
    wall_duration_logs = []
    loss_logs = []
    iterations_logs = []

    
    if start_step > iterations:
        print(f"Start step ({start_step}) is greater than the specified iterations ({iterations}). No training will be performed.")
        return generated_images, best_image, best_cost, ram_usage_list,gpu_usage_list
    for i in range(start_step, iterations + 1):
        
        loss, grads = apply_style_transfer_step(combination_image, base_image, style_reference_image, optimizer)
        
        if i % check_step == 0:
            
            gpu = get_gpu_usage()
            if gpu is not None:
                gpu_usage_list.append((i, gpu))
            ram = psutil.virtual_memory().percent
            cpu = psutil.cpu_percent(interval=1)
            disk = disk_usage('/').percent
            
            # append the current usage statistics
            ram_usage_list.append( ram)
            gpu_usage_list.append(gpu)
            disk_usage_list.append(disk)
            cpu_usage_list.append( cpu)
            loss_logs.append( loss.numpy())
            iterations_logs.append(i)
            if verbose > 0:
                print(f"Iteration {i}: loss={loss:.2f}")
            img = deprocess_image(combination_image.numpy(), img_height, img_width)
           
            end_time_cpu = time.process_time()  
            end_time_wall = time.time()  
            cpu_time = end_time_cpu - start_time_cpu  
            wall_time = end_time_wall - start_time_wall

            cpu_duration_logs.append((i, cpu_time))
            wall_duration_logs.append((i, wall_time))
            if loss < best_cost:
                best_cost = loss
                best_image = img
            if verbose > 0:
                print("CPU times in seconds: {:.2f}".format(cpu_time))
                print("Wall time in seconds: {:.2f}".format(wall_time))
            if include_checkpoints and checkpoint is not None:
                checkpoint.save(file_prefix=checkpoint_prefix)
            if verbose > 0:
                print("Iteration :{}".format(i))
                print('Total Loss {:e}.'.format(loss))
            generated_images.append(img)
            result_save(content_name, style_name, i, img)
            start_time_cpu = time.process_time()
            start_time_wall = time.time()
    end_time_wall = time.time()
    end_time_cpu = time.process_time()
    end_total_wall_time = end_time_wall - total_wall_time
    end_total_time_cpu = end_time_cpu - total_time_cpu
    if verbose > 0:
        print("Total wall time: {:.2f} seconds".format(end_total_wall_time))
        print("Total CPU time: {:.2f} seconds".format(end_total_time_cpu))
    return generated_images, best_image, best_cost,ram_usage_list,gpu_usage_list, disk_usage_list, cpu_usage_list, cpu_duration_logs, wall_duration_logs,loss_logs

Define the hyperparameter space.

In [None]:
list_of_optimizers = ["adam", "sgd", "rmsprop", "adagrad", "adamax"]
list_of_loss_networks = ["vgg19", "vgg16", "mobilenet", "resnet50", "inception_v3"]
enable_clip = [True, False]
use_l2 = [True, False]
list_of_image_sizes = [(32,32),(64,64),(128,128),(256, 400), (512, 600), (1024, 800), (2048, 1200)]
list_of_content_weights = [1e-6, 2.5e-8, 1e-4]
list_of_style_weights = [1e-6, 0.8e-6, 0.5e-6, 0.3e-6, 0.1e-6, 0.1e-7]
list_of_total_variation_weights = [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1, ]
list_of_learning_rates = [0.1,0.01, 0.001, 0.0001, 0.00001, 0.000001, 0.0000001]
list_of_iterations = [100, 200, 300, 400, 500]
list_of_check_steps = [10, 20, 50, 100]
list_of_noise_strengths = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
list_of_improvement_thresholds = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
list_of_beta_1s = [0.9, 0.99, 0.999]
list_of_epsilons = [1e-1, 1e-2, 1e-3]
list_of_decay_steps = [100, 200, 300]
list_of_decay_rates = [0.96, 0.98, 0.99]
list_of_weight_decays = [1e-4, 1e-5, 1e-6]
lpips_loss_nets = ["alex", "vgg", "squeeze"]

In [None]:
%%skip
from itertools import product



Sample the hyperspace.

In [None]:
%%skip
hyperparameter_space = prepare_hyperparameter_space()

In [None]:
%%skip
import random
sampled_space_configs = random.sample(hyperparameter_space, k=50)

In [None]:
%%skip
for config in sampled_space_configs:
    pass

In [None]:
content_folder = "content"
style_folder = "style"

In [None]:
def get_image_files(folder_path : str,image_file_types=('.png', '.jpg', '.jpeg')):
    return [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.lower().endswith(image_file_types)]

In [None]:
content_images = get_image_files(content_folder)[0:1]
style_images = get_image_files(style_folder)[0:1]

NameError: name 'content_folder' is not defined

In [None]:
content_images

In [None]:
style_images

In [None]:
%%skip
def update_model(loss_network: str, img_width: int, img_height: int):
    feature_extractor = get_model(loss_network, img_width, img_height)
    style_layer_names = get_style_layer_names(loss_network)
    content_layer_names = get_content_layer_names(loss_network)
    style_weights = get_style_weights(loss_network)
    content_weights = get_content_weights(loss_network)
    return feature_extractor, style_layer_names, content_layer_names, style_weights, content_weights

In [None]:
def loop_through_images(content_images, style_images):
    image_set = []
    best_image_set = []
    best_cost_set = []
    loss_logs_set = []
    ram_usage_set = []
    gpu_usage_set = []
    disk_usage_set = []
    cpu_usage_set = []
    cpu_duration_logs_set = []
    wall_duration_logs_set = []
    for content_path in content_images:
        content_name = os.path.basename(content_path)
        for style_path in style_images:
            style_name = os.path.basename(style_path)
            base_image, style_reference_image, combination_image = preprocess_NST_images(
                content_path, style_path)
            generated_images, best_image, best_cost,ram_usage_list,gpu_usage_list, disk_usage_list, cpu_usage_list, cpu_duration_logs,wall_duration_logs,loss_logs  = training_loop(base_image, style_reference_image,combination_image,content_name,style_name )
            image_set.append(generated_images)
            best_image_set.append(best_image)
            best_cost_set.append(best_cost)
            loss_logs_set.append(loss_logs)
            # log hardware usage
            ram_usage_set.append(ram_usage_list)
            gpu_usage_set.append(gpu_usage_list)
            disk_usage_set.append(disk_usage_list)
            cpu_usage_set.append(cpu_usage_list)
            cpu_duration_logs_set.append(cpu_duration_logs)
            wall_duration_logs_set.append(wall_duration_logs)
    return image_set, best_image_set, best_cost_set, loss_logs_set, ram_usage_set, gpu_usage_set, disk_usage_set, cpu_usage_set, cpu_duration_logs_set, wall_duration_logs_set

Expected: ['keras_tensor']
Received: inputs=Tensor(shape=(3, 400, 535, 3))


TypeError: in user code:

    File "C:\Users\Layo\AppData\Local\Temp\ipykernel_19984\189228123.py", line 7, in compute_loss_and_grads  *
        loss = compute_loss(
    File "C:\Users\Layo\AppData\Local\Temp\ipykernel_19984\2033682352.py", line 6, in compute_loss  *
        layer_features = features[content_layer_name]

    TypeError: unhashable type: 'list'


In [None]:
image_set, best_image_set, best_cost_set, loss_logs_set, ram_usage_set, gpu_usage_set, disk_usage_set, cpu_usage_set, cpu_duration_logs_set, wall_duration_logs_set = loop_through_images(content_images, style_images)

In [None]:
ram_usage_list = ram_usage_set[0]
best_cost_set = best_cost_set[0]
generated_images = image_set[0]
best_image = best_image_set[0]
loss_logs = loss_logs_set[0]

Get the number of iterations.

In [None]:
number_of_iterations : list[int] = [i for i in range(len(ram_usage_list))]

In [None]:
import matplotlib.pyplot as plt

def display_image(img):
    plt.imshow(img)
    plt.axis("off")

In [None]:
plt.figure(figsize=(12, 12))
start_index = 0
num = len(generated_images)
for i in range(num):
    plt.subplot(4, 3, i + 1)
    display_image(generated_images[i + start_index])  # Adjust indices based on your data
plt.show()


plt.figure(figsize=(8, 8))
display_image(best_image)
plt.title("Best Image")
plt.show()

Place results into a table

In [None]:
import pandas as pd

df = pd.DataFrame({
    "Iteration": number_of_iterations,
    "RAM Usage": [ram for ram in ram_usage_set],
    "GPU Usage ": [gpu for gpu in gpu_usage_set],
    "Disk Usage ": [disk for  disk in disk_usage_set],
    "CPU Usage": [cpu for  cpu in cpu_usage_set],
    "CPU Duration ": [cpu for  cpu in cpu_duration_logs_set],
    "Wall Duration ": [wall for wall in wall_duration_logs_set]
    })

Convert this to a table

In [None]:
df.to_csv("usage_statistics.csv", index=False)

End the notebook at this point.

In [None]:
import sys
sys.exit("Execution stopped here.")

# 2 Video Style Transfer 

Doing this with video.

In [None]:
def process_frame_or_batch(base_frame_tensor, style_reference_image, img_width,img_height, optimizer):
    style_image = preprocess_image(style_reference_image,img_height, img_width)
    combination_frame_tensor = tf.Variable(base_frame_tensor)
    loss, grads = apply_style_transfer_step(combination_frame_tensor,base_frame_tensor, style_image, optimizer)

    return loss, combination_frame_tensor


In [None]:
# Video file path
video_path = "videos/coast.mp4"
output_camera_path = "output_video.mp4"

img_height = 400
img_width = 400

Define functions for processing the video.

In [None]:
from shared_utils.video import get_cam,load_the_video, image_read,prepare_video_writer,release_video_writer,video_end

In [None]:
import cv2
def process_camera_frame(frame, style_image_path, img_width, img_height, optimizer):
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame_tensor = image_read(frame_rgb) 
    frame_tensor_resized = tf.image.resize(frame_tensor, (img_height, img_width))
    loss, processed_frame = process_frame_or_batch(frame_tensor_resized, style_image_path,img_width,img_height, optimizer)
    frame_output = deprocess_image(processed_frame.numpy(),img_height, img_width)  
    frame_color_output = cv2.cvtColor(frame_output, cv2.COLOR_RGB2BGR)
    return frame_color_output

Neural style transfer for camera.

In [None]:
from typing import Optional, Tuple
import tensorflow as tf
import numpy as np
def apply_camera(output_path : str = "output_video.mp4",style_image_path : str = "../images/starry_night.png", config = {}, video_path : Optional[str]= None,verbose : int = 0):
    cam, frame_width, frame_height, fps = get_cam(video_path,video_path != None)
    lr = config.get("learning_rate", 0.01)
    img_size = config.get("img_size", (400, 400))
    optimizer = get_optimizer(config.get("optimizer","adam"), learning_rate=lr)
    out = prepare_video_writer(output_path, frame_width, frame_height, fps)
    if not cam.isOpened() or out is None:
        print("Error: Could not open camera.")
        release_video_writer(cam,out)
        return
    title = "Camera Style Transfer" if video_path is None else "Video Style Transfer"
    start_time = time.time()
    if verbose > 0:
        print("Video path:", video_path)
        print("Output path:", output_path)
        print("Starting video processing...")
    while True:
        ret, frame = cam.read()
        if not ret:
            break  
        frame_color_output = process_camera_frame(frame, style_image_path, img_size[0], img_size[1], optimizer)
        out.write(frame_color_output)
        cv2.imshow(title, frame_color_output)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    video_end(start_time)
    release_video_writer(cam,out)
    return output_path

Do it for camera.

In [None]:
output_camera_path = apply_camera(output_path=output_camera_path)

Do it for video.

In [None]:
video_output_path : str = "output_video"

Prepare the configuration for the video.

In [None]:
config = {
    "optimizer": "adam",
    "lr": 0.01,
    "img": (img_width, img_height),
}

In [None]:
output_path = apply_camera(output_path=video_output_path,video_path=video_path, style_image_path="../images/starry_night.png", config=config, verbose=1)

In [None]:
if output_path:
    frames = load_the_video(output_path)