## VP project - Gianfranco Di Marco - 1962292
\
**RRU-Net: The Ringed Residual U-Net for Image Splicing Forgery Detection**
*   X. Bi, Y. Wei, B. Xiao and W. Li, "RRU-Net: *The Ringed Residual U-Net for Image Splicing Forgery Detection*" 2019 IEEE/CVF Conference on Computer Vision and Pattern Recognition Workshops (CVPRW), 2019, pp. 30-39, doi: 10.1109/CVPRW.2019.00010.


\
This project is an implementation in Tensorflow 2.0 of the above paper with a new Dataset: **Splicing COCO Dataset** (named by the author "Spliced Nist"); this can be found at the following link:

> https://github.com/jawadbappy/forgery_localization_HLED/tree/master/synthetic_data

\
The images of this dataset have been obtained from the pristine images of the MFC18 challenge, spliced with an object from the COCO Dataset. In particular, for this project, I used 180 images in total: 125 for the training set, 10 for the validation set and 45 for the test set. Here are the main citations for this data:

*   Bappy, Jawadul H and Simons, Cody and Nataraj, Lakshmanan and Manjunath, BS and Roy-Chowdhury, Amit K, "*Hybrid LSTM and Encoder-Decoder Architecture for Detection of Image Forgeries*", IEEE Transactions on Image Processing, 2019







## Requirements

**Repository**

In [None]:
!git clone https://github.com/Gianfranco-98/RRU-Net_Tensorflow2.git
!mv RRU-Net_Tensorflow2/* /content
!rm LICENSE
!rm README.md   
!rm -r RRU-Net_Tensorflow2

**Libraries**

In [None]:
!pip install tensorflow-addons
!pip install --upgrade tensorflow
!pip install --upgrade albumentations
!apt-get update
!apt install imagemagick

In [None]:
# Parameters
from configuration import *

# Dataset
from dataset import *

# Math
import numpy as np
from random import choice

# Images
import cv2
import skimage
from skimage import io
import matplotlib.pyplot as plt

# Learning
from net import *
import tensorflow as tf
import tensorflow_addons as tfa

# File management
import os
from google.colab import drive
from google.colab import output

# Time management
import time

# Other tools
from functools import partial
from tqdm import tqdm

## Dataset

**Generation**

In [None]:
if DATASET_NAME == "Realistic_Tampering_Dataset":

    # 0. Initialization
    DATA_DIR = "./data"
    PRISTINE_FOLDER = "Pristine"
    FORGERED_FOLDER = "Forgered"
    GROUND_TRUTH_FOLDER = "Ground_Truth"
    DATASET_DIR = '/content/drive/MyDrive/VP_Project/data/' + DATASET_NAME
    drive.mount("/content/drive", force_remount=True)
    start_time = time.time()

    # 1. Directory creation
    %cd /content
    !mkdir data

    # 2. Extraction
    %cd $DATASET_DIR
    print("Dataset extraction...")
    ZIPFILE = DATASET_NAME + '.zip'
    !unzip $ZIPFILE -d /content/data

    # 3. Conversion TIFF (PNG) -> JPEG with 100% quality
    %cd /content/data
    print("Conversion to JPEG with 100% quality")
    %cd $DATASET_NAME
    %cd $PRISTINE_FOLDER
    !for f in *.TIF; do  echo "Converting $f"; convert -quality 100 "$f"  "$(basename "$f" .TIF).jpg"; done
    !rm *.TIF
    %cd ..
    %cd $FORGERED_FOLDER
    !for f in *.TIF; do  echo "Converting $f"; convert -quality 100 "$f"  "$(basename "$f" .TIF).jpg"; done
    !rm *.TIF
    %cd ..
    %cd $GROUND_TRUTH_FOLDER
    !for f in *.PNG; do  echo "Converting $f"; convert -quality 100 "$f"  "$(basename "$f" .PNG).jpg"; done
    !rm *.PNG

    # Time computation
    print("Dataset generation completed in %fs" % (time.time() - start_time))

    %cd /content

In [None]:
if DATASET_NAME == "CASIA":

    # 1. Download Dataset
    start_time = time.time()
    %cd /content
    !mkdir data
    %cd data
    !wget --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=1IDUgcoUeonBxx2rASX-_QwV9fhbtqdY8' -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=1IDUgcoUeonBxx2rASX-_QwV9fhbtqdY8" -O 'CASIA2.0_revised.zip' && rm -rf /tmp/cookies.txt
    output.clear()

    # 2. Extract Dataset
    print("Dataset extraction...")
    ZIPFILE = 'CASIA2.0_revised' + '.zip'
    !unzip -q $ZIPFILE
    !mv CASIA2.0_revised CASIA
    !rm CASIA2.0_revised.zip
    output.clear()

    # 3. Generate GT
    print("Generating Ground Truth...")
    %cd CASIA
    !git clone https://github.com/namtpham/casia2groundtruth.git
    !mv /content/data/CASIA/casia2groundtruth/CASIA2.0_Groundtruth.zip /content/data/CASIA/
    !rm -r casia2groundtruth
    ZIPFILE = 'CASIA2.0_Groundtruth' + '.zip'
    !unzip -q $ZIPFILE
    !mv CASIA2.0_Groundtruth Gt
    !rm CASIA2.0_Groundtruth.zip
    output.clear()
 
    # 4. Select randomly 760 images
    %cd Tp       
    print("Selecting 760 images and removing the others...")
    files = os.listdir('.')
    indices = random.sample(range(len(files)), k=DATASET_SIZE)
    for i, f in enumerate(tqdm(files)):
        if i not in indices:
            gt_f = '../Gt/' + f.replace(f[-4:], '_gt.png')
            !rm $f
            !rm $gt_f
    output.clear()

    # Time computation
    print("Dataset generation completed in %fs" % (time.time() - start_time))

    %cd /content

In [None]:
if DATASET_NAME == "Spliced_COCO":
    # 1. Download Dataset
    start_time = time.time()
    %cd /content
    !mkdir data
    %cd data
    !wget --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=11cp_lomeTQUqRCJslMtXJbSHqCz8ir5P' -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=11cp_lomeTQUqRCJslMtXJbSHqCz8ir5P" -O 'Spliced_COCO.zip' && rm -rf /tmp/cookies.txt
    output.clear()

    # 2. Extract Dataset
    print("Dataset extraction...")
    ZIPFILE = 'Spliced_COCO' + '.zip'
    !unzip -q $ZIPFILE
    !rm Spliced_COCO.zip
    %cd /content
    output.clear()

    # Time computation
    print("Dataset generation completed in %fs" % (time.time() - start_time))

**Data Augmentation**

In [None]:
path_img = DATA_DIR + DATASET_NAME + '/' + FORGERED_FOLDER
path_mask = DATA_DIR + DATASET_NAME + '/' + GROUND_TRUTH_FOLDER
files = os.listdir(path_img)
pngs =  [f[:-4] for f in files if f[-3:] == 'png']
jpegs = [f[:-4] for f in files if f[-3:] == 'jpg']
tiffs = [f[:-4] for f in files if f[-3:] == 'tif']
print(len(files), "files")
print(len(pngs), "pngs")
print(len(tiffs), "tiffs")
print(len(jpegs), "jpegs")

In [None]:
from albumentations.augmentations.transforms import (
    VerticalFlip, HorizontalFlip, JpegCompression, GaussNoise)
import os
import shutil

ids = [f[:-4] for f in os.listdir(path_img)]
random.shuffle(ids)
completedata = {
    'train': ids[:TRAIN_SIZE], 
    'val': ids[TRAIN_SIZE:TRAIN_SIZE+VAL_SIZE], 
    'test': ids[TRAIN_SIZE+VAL_SIZE:]
}

train_set = [f for f in os.listdir(path_img) if f[:-4] in completedata['train']]
val_set = [f for f in os.listdir(path_img) if f[:-4] in completedata['val']]
test_set = [f for f in os.listdir(path_img) if f[:-4] in completedata['test']]

In [None]:
# Convert all files possible to jpg
for i in tqdm(files):               
    name = i.split('.')[0]
    img = os.path.join(path_img, i)
    image = io.imread(img) 
    if '.jpg' not in i: 
        if image.shape[-1] == 4: 
            image = skimage.color.rgba2rgb(image) 
        io.imsave(os.path.join(path_img, '{}.jpg'.format(name)), image, quality=100)

# Remove all png files
if DATASET_NAME == 'CASIA':
    !rm /content/data/CASIA/Tp/*.tif
elif DATASET_NAME == 'Spliced_COCO':
    !rm /content/data/Spliced_COCO/Tp/*.png

# Convert unconvertible tiffs (PTIFFS)
if len(tiffs) > 0:
    %cd /content/data/CASIA/Tp
    for f in tiffs:
        if f not in jpegs:
            f = f + '.tif'
            print("Converting", f)
            !convert -quality 100 "$f"  "$(basename "$f" .tif).jpg"

    ## Remove bad files
    print("\n\nRemoving or replacing bad files...\n")
    for filename in tqdm(os.listdir('.')):
        if '-0' in filename:
            newname = filename.replace('-0', '')
            !mv $filename $newname
        elif '-' in filename:
            !rm $filename
    !rm /content/data/CASIA/Tp/*.tif

output.clear()

In [None]:
# See issue https://github.com/yelusaleng/RRU-Net/issues/9
# N.B: Don't execute if you want to test Noise and Compression attacks

if DATASET_NAME == 'CASIA':
    train_set = [f.replace('.tif', '.jpg') for f in train_set]
elif DATASET_NAME == 'Spliced_COCO':
    train_set = [f.replace('.png', '.jpg') for f in train_set]

for i in tqdm(train_set):
    name = i.split('.')[0]
    img = os.path.join(path_img, i) 
    mask = os.path.join(path_mask, '{}_gt.png'.format(name)) 
    image = io.imread(img) 
    mask = io.imread(mask)

    list_quality = [50, 60, 70, 80, 90]
    quality = choice(list_quality)
    io.imsave(os.path.join(path_img, '{}_q.jpg'.format(name)), image, quality=quality)
    io.imsave(os.path.join(path_mask, '{}_q_gt.png'.format(name)), mask)

    whatever_data = "my name"

    augmentation = VerticalFlip(p=1.0)
    data = {"image": image, "mask": mask, "whatever_data": whatever_data, "additional": "hello"}
    augmented = augmentation(**data)
    image_ver, mask_ver, whatever_data, additional = augmented["image"], augmented["mask"], augmented["whatever_data"], augmented["additional"]
    io.imsave(os.path.join(path_img, '{}_ver.jpg'.format(name)), image_ver, quality=100)
    io.imsave(os.path.join(path_mask, '{}_ver_gt.png'.format(name)), mask_ver)

    augmentation = HorizontalFlip(p=1.0)
    data = {"image": image, "mask": mask, "whatever_data": whatever_data, "additional": "hello"}
    augmented = augmentation(**data)
    image_hor, mask_hor, whatever_data, additional = augmented["image"], augmented["mask"], augmented["whatever_data"], augmented["additional"]
    io.imsave(os.path.join(path_img, '{}_hor.jpg'.format(name)), image_hor, quality=100)
    io.imsave(os.path.join(path_mask, '{}_hor_gt.png'.format(name)), mask_hor)

    augmentation = GaussNoise(var_limit=(10, 50), p=1.0)
    data = {"image": image, "mask": mask, "whatever_data": whatever_data, "additional": "hello"}
    augmented = augmentation(**data)
    image_g, mask_g, whatever_data, additional = augmented["image"], augmented["mask"], augmented["whatever_data"], augmented["additional"]
    io.imsave(os.path.join(path_img, '{}_G.jpg'.format(name)), image_g, quality=100)
    io.imsave(os.path.join(path_mask, '{}_G_gt.png'.format(name)), mask_g)

## Main program

**Dataset initialization**

In [None]:
path_img = DATA_DIR + DATASET_NAME + '/' + FORGERED_FOLDER
ids = [f[:-4] for f in os.listdir(path_img)]
val_ids = [id for id in ids if (id[-1]!='q' and id[-1]!='r' and id[-1]!='G')]
random.shuffle(val_ids)
val_ids = val_ids[:VAL_SIZE+TEST_SIZE]
train_ids = [id for id in ids if id not in val_ids]
test_ids = val_ids[VAL_SIZE:]
val_ids = val_ids[:VAL_SIZE]
random.shuffle(train_ids)
completedata = {'train': train_ids, 'val': val_ids, 'test': test_ids}

In [None]:
print(len(completedata['train']))
print(len(completedata['val']))
print(len(completedata['test']))

In [None]:
def train_dataset_regenerate(ids):
    random.shuffle(ids)
    train_generator = partial(Forgery_Detection_Dataset(ids).generate, mode='train')
    train_dataset = tf.data.Dataset.from_generator(
        train_generator,
        output_signature=(
            tf.TensorSpec(shape=IMG_SHAPE[::-1]+(IMG_CHANNELS,), dtype=tf.float32),
            tf.TensorSpec(shape=IMG_SHAPE[::-1]+(IMG_CHANNELS,), dtype=tf.float32)
        )
    )
    train_dataset = train_dataset.batch(BATCH_SIZE)
    return train_dataset

In [None]:
## Train dataset --------------------------------------------------------------------------
print("Train dataset generation ...")
train_generator = partial(Forgery_Detection_Dataset(train_ids).generate, mode='train')
train_dataset = tf.data.Dataset.from_generator(
    train_generator,
    output_signature=(
        tf.TensorSpec(shape=IMG_SHAPE[::-1]+(IMG_CHANNELS,), dtype=tf.float32),
        tf.TensorSpec(shape=IMG_SHAPE[::-1]+(IMG_CHANNELS,), dtype=tf.float32)
    )
)
train_dataset = train_dataset.batch(BATCH_SIZE)

## Validation dataset ---------------------------------------------------------------------
print("Validation dataset generation ...")
val_generator = partial(Forgery_Detection_Dataset(val_ids).generate, mode='val')
val_dataset = tf.data.Dataset.from_generator(
    val_generator,
    output_signature=(
        tf.TensorSpec(shape=IMG_SHAPE[::-1]+(IMG_CHANNELS,), dtype=tf.float32),
        tf.TensorSpec(shape=IMG_SHAPE[::-1]+(IMG_CHANNELS,), dtype=tf.float32)
    )
)
val_dataset = val_dataset.batch(BATCH_SIZE)

## Test dataset ----------------------------------------------------------------------------
print("Test dataset generation ...")
test_generator = partial(Forgery_Detection_Dataset(test_ids).generate, mode='test')
test_dataset = tf.data.Dataset.from_generator(
    test_generator,
    output_signature=(
        tf.TensorSpec(shape=IMG_SHAPE[::-1]+(IMG_CHANNELS,), dtype=tf.float32),
        tf.TensorSpec(shape=IMG_SHAPE[::-1]+(IMG_CHANNELS,), dtype=tf.float32),
    )
)
test_dataset = test_dataset.batch(BATCH_SIZE)

**Network initialization**

In [None]:
LOAD_MODEL = True
CHECKPOINT_PERIOD = 10
CHECKPOINT_FILEPATH = '/content/drive/MyDrive/RRU_Checkpoints/3/'
drive.mount("/content/drive", force_remount=True)

In [None]:
rru_net = RRU_Net(n_channels=3, n_classes=1)
boundaries = [50*len(train_ids)/BATCH_SIZE, 100*len(train_ids)/BATCH_SIZE]
values = [LEARNING_RATE, LEARNING_RATE*0.1, LEARNING_RATE*0.01]
learning_rate_fn = tf.keras.optimizers.schedules.PiecewiseConstantDecay(boundaries, values)
rru_optimizer = tfa.optimizers.SGDW(WEIGHT_DECAY, learning_rate_fn, MOMENTUM)
loss = tf.keras.losses.BinaryCrossentropy()
checkpoint = tf.train.Checkpoint(rru_net)

if LOAD_MODEL:
    latest = tf.train.latest_checkpoint(CHECKPOINT_FILEPATH)
    rru_net.load_weights(latest)    
    # last_epoch = SET IF YOU NEED TO CONTINUE OLD TRAINING
    # N.B: FOR THE GRAPHS, DON'T FORGET TO RESTORE ALSO THE FILES TRAIN_LOSSES, MAX_SIGMOIDED AND F1_SCORES
else:
    last_epoch = 0

**Train**

In [None]:
threshold = 0.5
epoches = []
train_losses = []
max_sigmoided = []
f1_scores = []

for epoch in range(last_epoch, EPOCHES):
    print("_____ Train Epoch %d _____" % epoch)
    start = time.time()
    epoch_loss = 0.
    max_sigm_epoch = 0.
    train_dataset = train_dataset_regenerate(train_ids)

    # ----------------- Train ----------------- #
    for i, train_batch in enumerate(train_dataset):

        # Get images
        train_forgery, train_gt = train_batch
        train_gt = tf.image.rgb_to_grayscale(train_gt)
        
        # Learn
        with tf.GradientTape() as rru_tape:
            out = rru_net(train_forgery)
            out_probs = tf.keras.activations.sigmoid(out)
            out_probs_flatten = tf.reshape(out_probs, [-1])
            train_gt_flatten = tf.reshape(train_gt, [-1])
            train_loss = loss(train_gt_flatten, out_probs_flatten)
        rru_gradients = rru_tape.gradient(train_loss, rru_net.trainable_variables)
        rru_optimizer.apply_gradients(zip(rru_gradients, rru_net.trainable_variables))
        max_mask_value = tf.math.reduce_max(out_probs).numpy()

        print("Epoch %d | Batch %d - Train loss = %f - Max mask value = %f" % (epoch, i, train_loss, max_mask_value))
        max_sigm_epoch += max_mask_value
        epoch_loss += train_loss.numpy()


    # ---------------- Evaluate ---------------- #
    tot_F_measure = 0.
    epoches.append(epoch)
    train_losses.append(epoch_loss / i)
    max_sigmoided.append(max_sigm_epoch / i)
    print("Evaluation ...")    

    for j, val_batch in enumerate(val_dataset):

        # Get images
        val_forgery, val_gt = val_batch
        val_gt = tf.squeeze(tf.image.rgb_to_grayscale(val_gt))

        for v_forg, v_gt in zip(val_forgery, val_gt):

            # Compute rru output mask
            v_forg = tf.expand_dims(v_forg, 0)
            out = rru_net(v_forg)
            out_gray = tf.squeeze(out)
            out_mask = tf.cast(tf.keras.activations.sigmoid(out_gray) > threshold, dtype=tf.float32)

            # Compute F-measure
            v_gt = tf.expand_dims(v_gt, 0)
            intersection = tf.math.reduce_sum(tf.math.multiply(out_mask, v_gt))
            union = tf.math.reduce_sum(out_mask) + tf.math.reduce_sum(v_gt) + 1e-5
            tot_F_measure += 2 * intersection.numpy() / union.numpy()
    
    f1_scores.append(tot_F_measure / VAL_SIZE)
    print("Epoch %d completed in %f seconds | medium loss = %f | F1-score = %f" % (epoch, time.time() - start, train_losses[epoch], f1_scores[epoch]))
    
    # Plot train process
    fig = plt.figure()

    plt.title('Training Process')
    plt.xlabel('epoch')
    plt.ylabel('value')
    l1, = plt.plot(epoches, train_losses, c='red')
    l2, = plt.plot(epoches, f1_scores, c='blue')
    l3, = plt.plot(epoches, max_sigmoided, c='green')

    plt.legend(handles=[l1, l2, l3], labels=['Train Loss', 'F1-score', 'Max sigmoided values'], loc='best')
    plt.show()

    if epoch % CHECKPOINT_PERIOD == 0 and epoch > 0:
        print(" - Saving Model...")
        save_path = checkpoint.save(CHECKPOINT_FILEPATH)

**Evaluation**

In [None]:
tot_F_measure = 0.
threshold = 0.5
print("Evaluation ...")    

for j, val_batch in enumerate(val_dataset):
    # Get images
    val_forgery, val_gt = val_batch
    val_gt = tf.squeeze(tf.image.rgb_to_grayscale(val_gt))

    for v_forg, v_gt in zip(val_forgery, val_gt):

        # Compute rru output mask
        v_forg = tf.expand_dims(v_forg, 0)
        out = rru_net(v_forg)
        out_gray = tf.squeeze(out)
        out_mask = tf.cast(tf.keras.activations.sigmoid(out_gray) > threshold, dtype=tf.float32)

        # Show imgs
        plt.imshow(tf.squeeze(v_forg))
        plt.show()
        plt.imshow(v_gt, cmap='gray')
        plt.show()
        plt.imshow(out_mask, cmap='gray')
        plt.show()

        # Compute F-measure
        v_gt = tf.expand_dims(v_gt, 0)
        intersection = tf.math.reduce_sum(tf.math.multiply(out_mask, v_gt))
        union = tf.math.reduce_sum(out_mask) + tf.math.reduce_sum(v_gt) + 1e-5
        tot_F_measure += 2 * intersection.numpy() / union.numpy()

print("F1-score =", tot_F_measure / VAL_SIZE)

## Noise and Compression attacks

In [None]:
from albumentations.augmentations.transforms import (
    VerticalFlip, HorizontalFlip, JpegCompression, GaussNoise)
import os
import shutil

In [None]:
attack = 'Gaussian_Noise'
#attack = 'Jpeg_Compression'

In [None]:
path_img = DATA_DIR + DATASET_NAME + '/' + FORGERED_FOLDER
ids = [f[:-4] for f in os.listdir(path_img)]
val_ids = [id for id in ids if (id[-1]!='q' and id[-1]!='r' and id[-1]!='G')]
random.shuffle(val_ids)
val_ids = val_ids[:VAL_SIZE+TEST_SIZE]
train_ids = [id for id in ids if id not in val_ids]
test_ids = val_ids[VAL_SIZE:]
val_ids = val_ids[:VAL_SIZE]
random.shuffle(train_ids)
completedata = {'train': train_ids, 'val': val_ids, 'test': test_ids}
test_set = [f for f in os.listdir(path_img) if f[:-4] in completedata['test']]

In [None]:
# Test set initialization
if DATASET_NAME == 'CASIA':
    test_set = [f.replace('.tif', '.jpg') for f in test_set]
elif DATASET_NAME == 'Spliced_COCO':
    test_set = [f.replace('.png', '.jpg') for f in test_set]
trial_index = 5
path_img = DATA_DIR + DATASET_NAME + '/' + FORGERED_FOLDER
path_mask = DATA_DIR + DATASET_NAME + '/' + GROUND_TRUTH_FOLDER

**Noise**

In [None]:
# Noisy data generation
if attack == 'Gaussian_Noise':
  print("\nGaussian Noise attack with variance =", NOISE_VARIANCES[trial_index], "\n")
  for i in tqdm(test_set):
      name = i.split('.')[0]
      img = os.path.join(path_img, i) 
      mask = os.path.join(path_mask, '{}_gt.png'.format(name)) 
      image = io.imread(img) 
      mask = io.imread(mask)

      whatever_data = "my name"

      #augmentation = GaussNoise(var_limit=NOISE_VARIANCES[trial_index], p=1.0)
      augmentation = GaussNoise(var_limit=0.5, p=1.0)
      data = {"image": image, "mask": mask, "whatever_data": whatever_data, "additional": "hello"}
      augmented = augmentation(**data)
      image_g, mask_g, whatever_data, additional = augmented["image"], augmented["mask"], augmented["whatever_data"], augmented["additional"]
      io.imsave(os.path.join(path_img, '{}_G.jpg'.format(name)), image_g, quality=100)
      io.imsave(os.path.join(path_mask, '{}_G_gt.png'.format(name)), mask_g)

**Compression**

In [None]:
# Compressed data generation
if attack == 'Jpeg_Compression':
  print("\nJPEG Compression attack with quality =", COMPRESSION_QUALITIES[trial_index], "\n")
  for i in tqdm(test_set):
      name = i.split('.')[0]
      img = os.path.join(path_img, i) 
      mask = os.path.join(path_mask, '{}_gt.png'.format(name)) 
      image = io.imread(img) 
      mask = io.imread(mask)

      quality = COMPRESSION_QUALITIES[trial_index]
      io.imsave(os.path.join(path_img, '{}_q.jpg'.format(name)), image, quality=quality)
      io.imsave(os.path.join(path_mask, '{}_q_gt.png'.format(name)), mask)

**Evaluation on the corrupted test set**

In [None]:
test_ids = [f[:-4] for f in os.listdir(path_img) if f[:-6] in completedata['test']]

## Test dataset ----------------------------------------------------------------------------
print("Test dataset generation ...")
test_generator = partial(Forgery_Detection_Dataset(test_ids).generate, mode='test')
test_dataset = tf.data.Dataset.from_generator(
    test_generator,
    output_signature=(
        tf.TensorSpec(shape=IMG_SHAPE[::-1]+(IMG_CHANNELS,), dtype=tf.float32),
        tf.TensorSpec(shape=IMG_SHAPE[::-1]+(IMG_CHANNELS,), dtype=tf.float32),
    )
)
test_dataset = test_dataset.batch(BATCH_SIZE)

In [None]:
tot_F_measure = 0.
threshold = 0.5
print("Evaluation ...")    

for j, test_batch in enumerate(test_dataset):
    # Get images
    test_forgery, test_gt = test_batch
    test_gt = tf.squeeze(tf.image.rgb_to_grayscale(test_gt))

    for t_forg, t_gt in zip(test_forgery, test_gt):

        # Compute rru output mask
        t_forg = tf.expand_dims(t_forg, 0)
        out = rru_net(t_forg)
        out_gray = tf.squeeze(out)
        out_mask = tf.cast(tf.keras.activations.sigmoid(out_gray) > threshold, dtype=tf.float32)

        # Show images
        plt.imshow(tf.squeeze(t_forg))
        plt.show()
        plt.imshow(t_gt, cmap='gray')
        plt.show()
        plt.imshow(out_mask, cmap='gray')
        plt.show()

        # Compute F-measure
        t_gt = tf.expand_dims(t_gt, 0)
        intersection = tf.math.reduce_sum(tf.math.multiply(out_mask, t_gt))
        union = tf.math.reduce_sum(out_mask) + tf.math.reduce_sum(t_gt) + 1e-5
        tot_F_measure += 2 * intersection.numpy() / union.numpy()

print("F1-score =", tot_F_measure / TEST_SIZE)

In [None]:
# Remove augmented images
%cd $path_img
for f in os.listdir(path_img):
    id = f.split('.')[0]
    if id[-1] == 'q':
      !rm $f
%cd $path_mask
for f in os.listdir(path_mask):
    id = f.split('.')[0]
    if id[-4] == 'q':
      !rm $f