In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import json
import sys
import random
import os
import h5py
from sklearn.metrics import  confusion_matrix, accuracy_score, roc_curve, auc
from matplotlib import pyplot as plt
import matplotlib
from statsmodels.stats.proportion import proportion_confint
import matplotlib.ticker as mticker
import warnings
warnings.filterwarnings("ignore")

# Tensorflow imports
import tensorflow as tf
from tensorflow.keras import callbacks
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import models
from tensorflow.keras import optimizers
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import load_model

# Get the important functions from the other files
from paired_image_generator import PairedImageGenerator
from SRNet_model import create_SRNet

# Experiment constants

These constants are practically all the constants that govern the experiment. In general, most of them are related to the parameters of the Keras **flow_from_directory** function. ([Documentation of the function](https://keras.io/api/preprocessing/image/)):

- **EXPERIMENT_NAME:** Name of this experiment. This is the name that will be used to create the output files like the model results folder or the different figures.


- **NUM_EPOCHS:** Number of times all the images of the train set are used in the network to train.


- **BATCH_SIZE**: Number of images to train in each minibatch.


- **CLASS_MODE:** It has two possible values: **'categorical'** y **'binary'**. This only affects the validation generator.
    - **'categorical':** The labels are encoded with One Hot Encoding.
    - **'binary':** The labels are encoded with 0 and 1.


- **CLASSES:** This is the variable that set which are the classes that are going to be used during the training process. It's important in order to know the order of the output images.


- **INPUT_IMAGE_SIZE:** Size that all the images will be rescaled to just before going into the neural network.


- **COLOR_MODE:** This variable set the number of channels that we want to use in the inputted images. The possible values are: 'rgb', 'grayscale' and 'rgba'. **We will use 'grayscale' that just have one channel.**


- **THRESHOLD:** Threshold that will be used to build the confusion matrix and the Confidence Interval for the validation accuracy.


- **SEED.** This seed is trying to ensure the replicability of the experiments. However, Tensorflow doesn't work properly with seeds as far as I know so experiments are not correctly replicated.

In [None]:
# Directories
MODELS_DIR = 'trained_models'
BASE_DIR  = 'dataset/SRNet-Dataset-0.4'
TRAIN_DIR = os.path.join(BASE_DIR, 'train')
VAL_DIR   = os.path.join(BASE_DIR, 'val')
TEST_DIR  = os.path.join(BASE_DIR, 'test')

# Constants
EXPERIMENT_NAME = 'SRNet-experiment'
NUM_EPOCHS = 200
BATCH_SIZE = 16
CLASS_MODE  = 'binary'
CLASSES = ['0', '1'] # 0 -> Cover, 1 -> Stego
INPUT_IMAGE_SIZE = (256, 256, 1)
COLOR_MODE  = 'grayscale'
THRESHOLD = 0.5
SEED = 483

# Number of train, validation and test images
NUM_TRAIN = sum([len(files) for _, _, files in os.walk(TRAIN_DIR)])
NUM_VAL = sum([len(files) for _, _, files in os.walk(VAL_DIR)])
NUM_TEST = sum([len(files) for _, _, files in os.walk(TEST_DIR)])

# This parameter should be None always unless the pipeline is being tested.
# It only forces the epochs to be very short for testing.
STEPS_PER_EPOCH = None

In [None]:
# Set all the possible seeds to be the same one.
tf.random.set_seed(SEED)
np.random.seed(seed=SEED)
random.seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)

In [None]:
# Check all the available GPUs
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu, True)

print(gpus)

# Image generator creation

I've decided to use image generators because that's a good way to manage a lot of images easily.

- **ImageDataGenerator:** When calling this function we can specify different changes to be made to the images. For example, we can normalize all the images. [Here](https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/image/ImageDataGenerator) we can see all the different modifications that are available for data augmentation.


- **flow_from_directory:** Function that is used to get the generator itself. It has several parameters that are useful for the training process. In the first text cell above we can see the most important.

In [None]:
# Normalize the images
val_datagen = ImageDataGenerator(rescale=1./255)

# Select where to obtain the images from and some parameters
val_generator = val_datagen.flow_from_directory(VAL_DIR,
                                                target_size=INPUT_IMAGE_SIZE[0:2],
                                                batch_size=BATCH_SIZE,
                                                class_mode=CLASS_MODE,
                                                classes=CLASSES,
                                                color_mode=COLOR_MODE,
                                                shuffle=False,
                                                seed=SEED)

# Use the custom image generator present in the paired_image_generator.py file.
# The data augmentation of the images is within this generator.
train_generator = PairedImageGenerator(dim = (256, 256),
                                       n_channels = INPUT_IMAGE_SIZE[2], 
                                       batch_size = BATCH_SIZE, 
                                       images_path_cover = os.path.join(TRAIN_DIR, '0'), 
                                       images_path_stego = os.path.join(TRAIN_DIR, '1'), 
                                       shuffle = True,
                                       augment = True,
                                       seed = SEED)

In the following cell, a batch is retrieved from the generator and four images of it are shown.

In [None]:
batch = train_generator.__getitem__(1242)

first_images = batch[0][0:4]

# Plot of several images to check how they enter the network
for np_image in first_images:
    plt.imshow(np.reshape(np_image, (256, 256)), interpolation='none', cmap='gray')
    plt.show()

# Model creation

The architecture follows the guidelines given in ["Deep Residual Network for Steganalysis of Digital Images" by Mehdi Boroumand et al.](https://ieeexplore.ieee.org/document/8470101). The only difference between this implementation and the one introduced in the paper is the output activation function. In this implementation a Sigmoid is used in order to make threshold analysis with the model. However, the paper uses a Softmax output activation function with two output neurons.

The `create_SRNet` function comes from the `SRNet_model.py` file.

In [None]:
# Create the model
model = create_SRNet(INPUT_IMAGE_SIZE)

# Compile the model selecting the loss, the optimizer and the metrics.
model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
              optimizer=optimizers.Adamax(learning_rate=0.001),
              metrics=['accuracy'])

In [None]:
# The training directory is created
try:
    os.mkdir(os.path.join(MODELS_DIR, EXPERIMENT_NAME))
except OSError:
    pass

# The directory with the checkpoints is created
try:
    os.mkdir(os.path.join(MODELS_DIR, EXPERIMENT_NAME, 'checkpoints'))
except OSError:
    pass

In [None]:
%%time

# Callback to stop the algorithm when it doesn't improve.
early_stopping = callbacks.EarlyStopping(monitor='val_accuracy', 
                                        min_delta=0, 
                                        patience=100, 
                                        verbose=0, 
                                        mode='max', 
                                        baseline=None, 
                                        restore_best_weights=True)

# Callback to continuously save the best model after every epoch.
model_checkpoint = callbacks.ModelCheckpoint(os.path.join(MODELS_DIR, EXPERIMENT_NAME, 'checkpoints', 'best_model.h5'), 
                                             monitor='val_accuracy', 
                                             verbose=0, 
                                             save_best_only=False,
                                             save_weights_only=False, 
                                             mode='max', 
                                             save_freq='epoch')

# Callback to change the learning rate after 150 epochs
def lr_schedule(epoch):
    if epoch <= 149:
        return 0.001
    else:
        return 0.0001

learning_rate_scheduler = callbacks.LearningRateScheduler(lr_schedule, verbose=0)

# This callback saves the whole train history as a csv file
csv_logger = callbacks.CSVLogger(os.path.join(MODELS_DIR, EXPERIMENT_NAME, 'training_log.csv'), append=True, separator=';')


# Execute the training with all the callbacks
trainHistory = model.fit(train_generator,
                         steps_per_epoch=NUM_TRAIN//BATCH_SIZE if STEPS_PER_EPOCH == None else STEPS_PER_EPOCH,
                         epochs=NUM_EPOCHS, 
                         validation_data=val_generator,
                         validation_steps=NUM_VAL//BATCH_SIZE if STEPS_PER_EPOCH == None else STEPS_PER_EPOCH,
                         callbacks=[csv_logger, early_stopping, model_checkpoint, learning_rate_scheduler])

# We load the best model obtained during validation
best_model = load_model(os.path.join(MODELS_DIR, EXPERIMENT_NAME, 'checkpoints', 'best_model.h5'))

# Evaluation graphs

In [None]:
def gen_line_plot(df_training_log, metric_name, ax, tick_size):
    # Plot train trend
    sns.lineplot(x=range(1, len(df_training_log) + 1), 
                 y=df_training_log[metric_name], 
                 ax=ax, 
                 label="Train", 
                 palette="tab10")
    
    # Plot validation train
    sns.lineplot(x=range(1, len(df_training_log) + 1), 
                 y=df_training_log[f'val_{metric_name}'], 
                 ax=ax, 
                 label="Validation", 
                 palette="tab10")

    # Add the legend
    ax.legend(fontsize=14)

    # Change axis labels
    ax.set_ylabel(metric_name.capitalize(), fontdict={'fontsize':tick_size + 1})

    # Set the size of the y ticks
    ticks_loc = ax.get_yticks().tolist()
    ax.yaxis.set_major_locator(mticker.FixedLocator(ticks_loc))
    ax.set_yticklabels(['{:.3f}'.format(value) for value in ticks_loc], fontsize = tick_size)
    

def gen_trend_plot(df_training_log):
    sns.set(style="darkgrid")
    
    # Generate the general plot
    fig, axes = plt.subplots(2, 1, 
                             figsize=(10, 8),
                             sharex=True)
    
    # Font size
    tick_size = 17

    # Set the general features of the plot
    plt.xlabel('Epoch', fontdict={'fontsize':tick_size + 1})
    plt.tick_params(axis='x', which='major', labelsize=tick_size)
    fig.suptitle(EXPERIMENT_NAME + " training trends", fontsize=21)
    plt.tight_layout()
    
    # Generate both subplots
    gen_line_plot(df_training_log, 'accuracy', axes[0], tick_size)
    gen_line_plot(df_training_log, 'loss', axes[1], tick_size)
    
    # Save the trend plot
    plt.savefig(os.path.join(MODELS_DIR, EXPERIMENT_NAME, EXPERIMENT_NAME + '_training_trend.pdf'), bbox_inches='tight')
    

def get_confidence_interval(success_rate, n, alpha):
    # Get the confidence interval of the prediction
    confidence_interval_tuple = proportion_confint(n*success_rate, n, method='wilson', alpha=alpha)
    
    # Reformat the tuple to have less decimal numbers
    return tuple([float("{0:.4f}".format(limit)) for limit in confidence_interval_tuple])


def gen_confusion_matrix(predictions, generator, num_images, threshold=0.5, alpha=0.05):
    sns.set_style("darkgrid")
    matplotlib.rc('xtick', labelsize=15) 
    matplotlib.rc('ytick', labelsize=15) 
    
    # Get the discrete predictions out of the continuous output
    discrete_predicted_labels = np.round(predictions - threshold + 0.5)
    real_labels = generator.classes

    # Get the raw confusion matrix in numpy format
    conf_matrix = confusion_matrix(real_labels, 
                                   discrete_predicted_labels, 
                                   labels = list(map(lambda x: generator.class_indices[x], CLASSES)))

    # Transform the confusion matrix into a DataFrame
    conf_matrix = pd.DataFrame(conf_matrix, columns = CLASSES, index = CLASSES)
    
    # Get the accuracy from the predictions
    accuracy = accuracy_score(real_labels, discrete_predicted_labels)
    
    # Generate the confussion matrix
    fig, ax = plt.subplots(figsize=(5, 4))
    sns.set(font_scale=2)
    sns.heatmap(conf_matrix, annot=True, cmap='Blues', fmt='g', ax=ax, cbar=False)

    # Labels, title and ticks
    label_font = {'size':'16'}
    ax.set_xlabel('Predicted value', fontdict=label_font)
    ax.set_ylabel('True value', fontdict=label_font)
    
    # Title
    title_font = {'size':'17'}
    ax.set_title(EXPERIMENT_NAME + 
                 ' Confusion matrix' + 
                 '\nValidation accuracy: {0:.3f}'.format(accuracy) + 
                 f'\nConfidence Interval ({alpha}): {str(get_confidence_interval(accuracy, conf_matrix.sum().sum(), alpha)[0])}', 
                 fontdict=title_font)

    plt.savefig(os.path.join(MODELS_DIR, EXPERIMENT_NAME, EXPERIMENT_NAME + '_confusion_matrix.pdf'), bbox_inches='tight')

In [None]:
predicted_labels = best_model.predict(val_generator, verbose = 0).ravel()
gen_trend_plot(trainHistory.history)
gen_confusion_matrix(predicted_labels, val_generator, num_images=NUM_VAL, threshold=THRESHOLD, alpha=0.05)

# Save the prediction in the validation set
with open(os.path.join(MODELS_DIR, EXPERIMENT_NAME, f'{EXPERIMENT_NAME}_raw_prediction.npy'), 'wb') as f:
    np.save(f, predicted_labels)