In [None]:
!pip install mtcnn

Collecting mtcnn
  Downloading mtcnn-0.1.1-py3-none-any.whl (2.3 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/2.3 MB[0m [31m3.8 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.7/2.3 MB[0m [31m9.6 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━[0m [32m1.4/2.3 MB[0m [31m13.5 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.3/2.3 MB[0m [31m16.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: mtcnn
Successfully installed mtcnn-0.1.1


In [None]:
import os
import cv2

from google.colab import drive
from google.colab.patches import cv2_imshow

from keras.models import load_model

from mtcnn import MTCNN
import numpy as np
import pathlib
import tensorflow as tf
from tqdm.notebook import tqdm


In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
real_dir = pathlib.Path("/content/drive/MyDrive/ST456_project_team_afi/DFGC/Real")
facemorph_dir = pathlib.Path("/content/drive/MyDrive/ST456_project_team_afi/DFGC/Facemorph")
faceshift_dir = pathlib.Path("/content/drive/MyDrive/ST456_project_team_afi/DFGC/Faceshift")
baseline_dir = pathlib.Path("/content/drive/MyDrive/ST456_project_team_afi/DFGC/Baseline")
adversarial_dir = pathlib.Path("/content/drive/MyDrive/ST456_project_team_afi/DFGC/Adversarial Attack")

## Cropping

In [None]:
class Preprocessor():
  def __init__(self, height=112, width=112):
    self.height = height
    self.width = width
    self.mtcnn = MTCNN()

  def centerCrop(self, img):
    """
    Crop the center of the image.
    Args:
      img: image to crop.
    Returns:
      cropped image.
    """
    width, height = img.shape[1], img.shape[0]
    crop_width = self.width if self.width < img.shape[1] else img.shape[1]
    crop_height = self.height if self.height < img.shape[0] else img.shape[0]
    mid_x, mid_y = int(width/2), int(height/2)
    cw2, ch2 = int(crop_width/2), int(crop_height/2)
    crop_img = img[mid_y-ch2:mid_y+ch2, mid_x-cw2:mid_x+cw2]
    return crop_img


  def __call__(self, paths, output_dir, verbose=False, remove=False):
    for i, path in tqdm(enumerate(paths)):
      img = cv2.imread(path)
      rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

      # perform face detection using mtcnn
      faces = self.mtcnn.detect_faces(rgb)

      # if the number of faces detected is not 1, perform centre crop
      try:
        assert len(faces) == 1
      except AssertionError as e:
        if verbose: print("centre crop")
        cropped_img = self.centerCrop(img)
        if verbose: cv2_imshow(cropped_img)
        cv2.imwrite(f"{output_dir}/{i}.png", cropped_img)
        if remove: os.remove(path)
        continue

      # crop image using the detected face
      x, y, w, h = faces[0]["box"]
      face_center_x = x + w // 2
      face_center_y = y + h // 2
      crop_x = max(0, face_center_x - self.width // 2)
      crop_y = max(0, face_center_y - self.height // 2)
      cropped_img = img[crop_y:crop_y + self.height, crop_x:crop_x + self.width]

      if verbose: cv2_imshow(cropped_img)
      cv2.imwrite(f"{output_dir}/{i}.png", cropped_img)
      if remove: os.remove(path)

preprocessor = Preprocessor()

In [None]:
real_paths = np.array(sorted([str(path) for path in real_dir.glob('*.png')]))
preprocessor(real_paths, real_dir, remove=True)

In [None]:
baseline_paths = np.array(sorted([str(path) for path in baseline_dir.glob('*.png')]))
preprocessor(baseline_paths, baseline_dir, remove=True)

In [None]:
adversarial_paths = np.array(sorted([str(path) for path in adversarial_dir.glob('*.png')]))
preprocessor(adversarial_paths, adversarial_dir, remove=True)

In [None]:
dir = pathlib.Path("/content/drive/MyDrive/ST456_project_team_afi/DFGC/Faceshift/miaotao_853000")
faceshift_paths = np.array(sorted([str(path) for path in dir.glob('*.png')]))
preprocessor(faceshift_paths, faceshift_dir, remove=True)

## Super Resolving

In [None]:
#timestep encoding allowing the U-Net to utilize the temporal aspect of noise application in its denoising process.
# Similar to positional encoding in transformers.
class GammaEncoding(layers.Layer):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
        self.dense = layers.Dense(dim)
        self.act = layers.LeakyReLU()

    def call(self, noise_level):
        noise_level = tf.cast(noise_level, tf.float32)
        noise_level = tf.expand_dims(tf.expand_dims(tf.expand_dims(noise_level, -1), -1), -1)
        count = self.dim // 2
        step = tf.range(count, dtype=tf.float32) / count
        encoding = noise_level * tf.exp(tf.math.log(1e4) * step[tf.newaxis, :])
        encoding = tf.concat([tf.math.sin(encoding), tf.math.cos(encoding)], axis=-1)
        return self.act(self.dense(encoding))

#define convolutional block, see Figure 1
class ConvBlock(layers.Layer):
    def __init__(self, in_c, out_c, time_steps=1000, activation="relu", dropout_rate=0.01, reg_lambda=0.01):
        super().__init__()
        self.conv1 = layers.Conv2D(out_c, (3, 3), padding="same", kernel_regularizer=regularizers.l2(reg_lambda))
        self.bn1 = layers.BatchNormalization()
        self.act1 = layers.ReLU() if activation == "relu" else layers.Activation('silu')
        self.dropout1 = layers.Dropout(dropout_rate)
        self.conv2 = layers.Conv2D(out_c, (3, 3), padding="same", kernel_regularizer=regularizers.l2(reg_lambda))
        self.bn2 = layers.BatchNormalization()
        self.act2 = layers.ReLU() if activation == "relu" else layers.Activation('silu')
        self.dropout2 = layers.Dropout(dropout_rate)
        self.embedding = GammaEncoding(out_c)

    def call(self, inputs, time, training=False):
        time_embedding = self.embedding(time)
        time_embedding = tf.reshape(time_embedding, (-1, 1, 1, self.embedding.dim))
        x = self.conv1(inputs)
        x = self.bn1(x, training=training)
        x = self.act1(x)
        x = self.dropout1(x, training=training)
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = self.act2(x)
        x = self.dropout2(x, training=training)
        x = x + time_embedding
        return x

#define encoder block based on convolutional block, see Figure 2
class EncoderBlock(layers.Layer):
    def __init__(self, in_c, out_c, time_steps, activation="relu"):
        super().__init__()
        self.conv_block = ConvBlock(in_c, out_c, time_steps, activation)
        self.pool = layers.MaxPooling2D((2, 2))

    def call(self, inputs, time):
        x = self.conv_block(inputs, time)
        p = self.pool(x)
        return x, p

#define decoder block based on convolutional block, see Figure 2
class DecoderBlock(layers.Layer):
    def __init__(self, in_c, out_c, time_steps, activation="relu"):
        super().__init__()
        self.up = layers.Conv2DTranspose(in_c, (2, 2), strides=(2, 2))
        self.conv_block = ConvBlock(in_c + out_c, out_c, time_steps, activation)

    def call(self, inputs, skip, time):
        x = self.up(inputs)
        x = layers.concatenate([x, skip], axis=-1)
        x = self.conv_block(x, time)
        return x


#define U-Net architecture, see Figure 2
# @tf.keras.saving.register_keras_serializable()
class UNet(models.Model):
    def __init__(self, input_channels, output_channels, time_steps):
        super().__init__()
        self.input_channels = input_channels
        self.output_channels = output_channels
        self.time_steps = time_steps

        self.e1 = EncoderBlock(self.input_channels, 64, time_steps=self.time_steps)
        self.e2 = EncoderBlock(64, 128, time_steps=self.time_steps)
        self.e3 = EncoderBlock(128, 256, time_steps=self.time_steps)
        self.e4 = EncoderBlock(256, 512, time_steps=self.time_steps)

        self.b = ConvBlock(512, 1024, time_steps=self.time_steps)

        self.d1 = DecoderBlock(1024, 512, time_steps=self.time_steps)
        self.d2 = DecoderBlock(512, 256, time_steps=self.time_steps)
        self.d3 = DecoderBlock(256, 128, time_steps=self.time_steps)
        self.d4 = DecoderBlock(128, 64, time_steps=self.time_steps)

        self.outputs = layers.Conv2D(self.output_channels, kernel_size=1, padding="same")

    def call(self, inputs, time):
        upsampled_low_res_image, current_noisy_image = inputs

        x = tf.concat([upsampled_low_res_image, current_noisy_image], axis=-1)

        s1, p1 = self.e1(x, time)
        s2, p2 = self.e2(p1, time)
        s3, p3 = self.e3(p2, time)
        s4, p4 = self.e4(p3, time)
        b = self.b(p4, time)
        d1 = self.d1(b, s4, time)
        d2 = self.d2(d1, s3, time)
        d3 = self.d3(d2, s2, time)
        d4 = self.d4(d3, s1, time)
        outputs = self.outputs(d4)
        return outputs

In [None]:
def load_sr3_from_weights(path):
  model = DiffusionModel(time_steps=1000, beta_start=1e-4, beta_end=0.02, image_dims=(224, 224, 3))

  adam_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0005)
  model.compile(optimizer=adam_optimizer, loss='mean_squared_error')

  #create dummy to load model
  dummy_low_res_image = np.random.random((1, 224, 224, 3)).astype(np.float32)
  dummy_high_res_image = np.random.random((1, 224, 224, 3)).astype(np.float32)
  dummy_ts = np.array([0], dtype=np.int32)
  noisy_images, _ = model.add_noise(dummy_high_res_image, dummy_ts)
  inputs = [dummy_low_res_image, noisy_images]
  _ = model(inputs, dummy_ts, training=False)

  #load weights
  model.load_weights(path)
  return model

In [None]:
def perform_inference(model, low_res_batch, time_steps):

    # Initialize a random image with normal noise
    current_image = tf.random.normal(low_res_batch.shape, stddev=np.sqrt(1/255))
    alphas = model.alphas.numpy()
    gammas = model.alpha_hats.numpy()

    # Reverse process of the diffusion model
    for t in reversed(range(time_steps)):
        if t > 0:
            z = tf.random.normal(low_res_batch.shape, stddev=np.sqrt(1/255))
        else:
            z = tf.zeros(low_res_batch.shape, dtype=tf.float32)

        ts_tensor = tf.fill([current_image.shape[0]], t)
        ts_tensor = tf.cast(ts_tensor, tf.int32)

        predicted_noise = model([low_res_batch, current_image], ts_tensor, training=False)

        # Update the current image
        if t > 0:
            current_image = (1/alphas[t]) * (current_image - ((1-alphas[t]) / np.sqrt(1-gammas[t])) * predicted_noise) + np.sqrt(1-alphas[t]) * z

    # Convert the output image to the proper range [0, 255]
    final_image = (current_image + 1) * 127.5
    final_image = tf.clip_by_value(final_image, 0, 255)
    final_image = tf.cast(final_image, tf.uint8)

    return final_image

In [None]:
srcnn = load_model("/content/drive/MyDrive/ST456_project_team_afi/0_SRCNN_final/SRCNN.keras")
sr3 = load_sr3_from_weights('/content/drive/MyDrive/ST456_project_team_afi/0_SR3_final/SR3_weights.h5')
srgan_generator = load_model("/content/drive/MyDrive/ST456_project_team_afi/0_SRGAN_final/generator.keras")
print(srcnn.summary())
print(sr3.summary())
srgan_generator.summary()



Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 224, 224, 128)     31232     
                                                                 
 dropout (Dropout)           (None, 224, 224, 128)     0         
                                                                 
 conv2d_1 (Conv2D)           (None, 224, 224, 64)      204864    
                                                                 
 dropout_1 (Dropout)         (None, 224, 224, 64)      0         
                                                                 
 conv2d_2 (Conv2D)           (None, 224, 224, 3)       4803      
                                                                 
Total params: 240899 (941.01 KB)
Trainable params: 240899 (94

In [None]:
def super_resolve(input_dir, output_dir, model="bicubic", hr=[224, 224], max_imgs=1000):
  img_paths = np.array(sorted([str(path) for path in input_dir.glob('*.png')]))
  for path in tqdm(img_paths[:max_imgs]):
    img = cv2.imread(path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    if model == "bicubic":
      img = tf.image.resize(img, hr, method=tf.image.ResizeMethod.BICUBIC)
    elif model == srgan_generator:
      img = tf.cast(img, dtype=tf.float32) / 127.5 - 1.0
      img = tf.expand_dims(img, axis=0)
      img = model(img)[0]
      img  = (img + 1.0) * 127.5
    elif model == srcnn:
      img = tf.image.resize(img, hr, method=tf.image.ResizeMethod.BICUBIC)
      img = tf.cast(img, dtype=tf.float32) / 127.5 - 1.0
      img = tf.expand_dims(img, axis=0)
      img = model(img)[0]
      img = (img + 1.0) * 127.5
    elif model == sr3:
      img = tf.image.resize(img, hr, method=tf.image.ResizeMethod.BICUBIC)
      img = tf.cast(img, dtype=tf.float32) / 127.5 - 1.0
      img = tf.expand_dims(img, axis=0)
      img = perform_inference(model, img, 400)[0]


    cv2.imwrite(f"{output_dir}/{path.split('/')[-1]}", cv2.cvtColor(img.numpy(), cv2.COLOR_BGR2RGB))


In [None]:
for dir in tqdm([
                real_dir,
                baseline_dir,
                adversarial_dir,
                faceshift_dir
                 ]):
  super_resolve(dir, dir / "Bicubic")
  super_resolve(dir, dir / "SRGAN", srgan_generator)
  super_resolve(dir, dir / "SRCNN", srcnn)
  super_resolve(dir, dir / "SR3", sr3, max_imgs=100)



  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]