# Visualisation 

## Import libraries

In [10]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # don't show all the tensorflow startup messages
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras import models
from keras.utils import to_categorical
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
import pandas as pd
from sys import path
import h5py

import json
import cv2
path.append(f'{os.getcwd()}/training')
path.append(f'{os.getcwd()}/utils')
path.append(f'{os.getcwd()}/noiseprint2')
from dct_train import main as train
from preprocessor import Preprocessor

In [None]:
class Input:
    def __init__(self, downscale_factor, grayscale, dct_rep, patch_size, bands, sf_lo, sf_mid, sf_hi, his_range):
        self.downscale_factor = downscale_factor
        self.colour_space = self.get_colour_space(grayscale)
        self.dct_rep = dct_rep
        self.patch_size = patch_size
        self.bands = bands
        self.sf_range = [sf_lo, sf_mid, sf_hi]
        self.his_range = his_range

        self.sf_num = self.num_of_sf()
        self.dset_name = self.get_dset_name(grayscale)
        self.his_size = self.get_his_range()

    def num_of_sf(self):
        if self.bands == 3:
            return sum([sf[1] - sf[0] for sf in self.sf_range])
        else:
            return self.sf_range[self.bands][1] - self.sf_range[self.bands][0]
       
    def get_dset_name(self, grayscale):
        return f'g:{grayscale}p:{self.patch_size}_his:{self.his_range[0]},{self.his_range[1]}_sf_num:{self.sf_num}_subbands:{self.bands}'

    def get_colour_space(self, grayscale):
        return cv2.COLOR_BGR2GRAY if grayscale else cv2.COLOR_BGR2RGB

    def get_his_range(self):
        return (len(range(self.his_range[0], self.his_range[1])) + 1) * self.sf_num

## Functions

In [7]:
# get the length for the dataset, used for generator function and to calculate steps_per_epoch
def get_dset_len(path, dset):
    with h5py.File(path, 'r') as f:
        return f[dset].shape[0]


# generator function
def generator(dataset_name, batch_size, num_examples):
    with h5py.File(f'processed/DCT_test_{dataset_name}.h5', 'r') as f:
        X = f['DCT']
        while True:
            for i in range(0, num_examples, batch_size):
                batch_X = X[i: min(i + batch_size, num_examples)]
                yield (batch_X)


def get_labels(dataset_name):
    with h5py.File(f'processed/labels_test_{dataset_name}.h5', 'r') as f:
        return np.array(f['labels'][()])


# get predictions and convert numerical values to class name
def get_predictions(dataset_name, model, num_examples):
    predictions = np.argmax(model.predict(
        generator(dataset_name, 32, num_examples), steps=np.ceil(num_examples/32)), axis=1)

    return np.select(
        [
            predictions == 0,
            predictions == 1,
            predictions == 2,
            predictions == 3,
            predictions == 4,
            predictions == 5,
            predictions == 6,
            predictions == 7,

        ],
        [
            'facebook',
            'flickr',
            'google+',
            'instagram',
            'original',
            'telegram',
            'twitter',
            'whatsapp'
        ],
        predictions
    )


# get accuracy at patch level
def patch_truth(labels, predictions, classes):

    patch_truth = [label.decode('UTF-8').split('.')[0] for label in labels]

    print(classification_report(patch_truth, predictions, target_names=classes))


# get accuracy at image level
def image_truth(labels, predictions, classes):
    # decode
    y_test_im = []
    for y in labels:
        y_test_im.append(y.decode('UTF-8'))

    df = pd.DataFrame([y_test_im, predictions],
                      index=['truth', 'prediction']).T

    # group by class and image number
    grouped_df = df.groupby('truth', as_index=False)[
        'prediction'].agg(pd.Series.mode)
    # print(grouped_df.to_string())

    # split into respective image number
    grouped_df['truth'] = grouped_df['truth'].str.split('.').str[0]
    
    # get rid of non string values
    grouped_df = grouped_df[grouped_df['prediction'].apply(lambda x: isinstance(x, str))]

    image_truth = grouped_df['truth'].tolist()
    image_predictions = grouped_df['prediction'].tolist()

   
    print(classification_report(image_truth, image_predictions, target_names=classes))
    
def main(name, dataset_name):
    # user defined variables

    model = models.load_model(f'models/cnn_{name}')

    classes = ['facebook', 'flickr', 'google+', 'instagram',
               'original', 'telegram', 'twitter', 'whatsapp']

    # get the number of examples for the generator and steps
    num_examples = get_dset_len(f'{path[0]}/processed/DCT_test_{dataset_name}.h5', 'DCT')

    # predictions represented as integer representation of classes
    predictions = get_predictions(dataset_name, model, num_examples)

    # labels with string name and image indexes
    labels = get_labels(dataset_name)

    patch_truth(labels, predictions, classes)

    image_truth(labels, predictions, classes)

## Run code and obtain results

In [8]:
with open('config.json') as f:
    args = json.load(f)

input = Input(**args)

epochs = 10
batch_size = 32
architecture = 'dct_cnn_2017'

name = f'{architecture}_e:{epochs}_b:{batch_size}'
test(name, input.dset_name)