In [None]:
import os
import cv2
import numpy as np
# import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from keras import models, layers
from keras.layers import DepthwiseConv2D,SeparableConv2D,Conv2D,MaxPool2D,Dense,GlobalMaxPool2D,Flatten,Input,Add,BatchNormalization,GlobalAveragePooling2D,ReLU,Dropout,AveragePooling2D
import warnings
from sklearn import metrics
warnings.filterwarnings("ignore")
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.model_selection import train_test_split
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping,ModelCheckpoint
from keras.utils import to_categorical

In [None]:
# plt.imshow(cv2.imread("/kaggle/input/alaska2-image-steganalysis/Cover/00001.jpg"))

In [None]:
# Configure Strategy. Assume TPU...if not set default for GPU/CPU
tpu = None
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.experimental.TPUStrategy(tpu)
except ValueError:
    # Enable XLA
    tf.config.optimizer.set_jit(enabled = "autoclustering")
    strategy = tf.distribute.get_strategy()
    
# Set Auto Tune
AUTOTUNE = tf.data.experimental.AUTOTUNE 
BATCH_SIZE = 16 * strategy.num_replicas_in_sync

In [None]:
data_dir="/kaggle/input/alaska2-image-steganalysis/"

# Dataset Creation

In [None]:
def append_path(pre):
    return np.vectorize(lambda file: os.path.join("/kaggle/input/alaska2-image-steganalysis/", pre, file))

In [None]:
train_filenames = np.array(os.listdir("/kaggle/input/alaska2-image-steganalysis/Cover"))
np.random.seed(0)
positives = train_filenames.copy()
negatives = train_filenames.copy()
np.random.shuffle(positives)
np.random.shuffle(negatives)

jmipod = append_path('JMiPOD')(positives[10000:15000])
juniward = append_path('JUNIWARD')(positives[15000:20000])
uerd = append_path('UERD')(positives[20000:25000])
neg_path = append_path('Cover')(negatives[:15000])
pos_path=np.concatenate([jmipod,juniward,uerd])
np.random.shuffle(pos_path)
train_paths=np.concatenate([neg_path,pos_path])

# test_paths=np.concatenate([append_path('JMiPOD')(positives[100:200]),append_path('JUNIWARD')(positives[200:300])],append_path('UERD')(positives[300:400]),append_path('Cover')(positives[400:500]))

In [None]:
np.random.shuffle(train_paths)

In [None]:
train_paths

In [None]:
train_labels=[]
for path in train_paths:
    if "Cover" in path:
        train_labels.append(0)
    else:
        train_labels.append(1)

In [None]:
len(train_labels),len(train_paths)

In [None]:
def data_augment(image,label=None):
    image=tf.image.random_flip_left_right(image)
    image=tf.image.random_flip_up_down(image)
    
    if label is None:
        return image
    else:
        return image,label
    
def decode_image(filename, label=None, image_size=(256,256)):
    bits=tf.io.read_file(filename)
    image=tf.image.decode_jpeg(bits,channels=3)
    image = tf.image.rgb_to_grayscale(image)
    image=tf.cast(image,tf.float32) / 255.0#image to tf.float32 data type
    image=tf.image.resize(image,image_size)
    
    if label is None:
        return data_augment(image)
    else:
        return data_augment(image,label)

In [None]:
test_dataset=np.concatenate([append_path('Cover')(negatives[50000:51000]),append_path('UERD')(positives[51000:52000])])

In [None]:
test_label=np.array([0] * len(append_path('Cover')(negatives[50000:51000]))+[1] * len(append_path('UERD')(positives[51000:52000])))

In [None]:
test_label

In [None]:
x_train, x_validation, y_train, y_validation = train_test_split(train_paths, train_labels, test_size=0.15, random_state=1000)

# def decode_image(filename, label=None, image_size=(256,256)):
#     bits=tf.io.read_file(filename)
#     image=tf.image.decode_jpeg(bits,channels=3)
# #     image = tf.image.rgb_to_grayscale(image)
#     image=tf.cast(image,tf.float32) / 255.0#image to tf.float32 data type
#     image=tf.image.resize(image,image_size)
    
#     if label is None:
#         return image
#     else:
#         return image,label
    
    
train_dataset = (tf.data.Dataset
                 .from_tensor_slices((x_train,y_train))
                 .map(decode_image, num_parallel_calls=AUTOTUNE)
                 .cache()
                 .repeat()
                 .shuffle(1024)
                 .batch(BATCH_SIZE)
                 .prefetch(AUTOTUNE)
                )
valid_dataset= (tf.data.Dataset
                .from_tensor_slices((x_validation,y_validation))
                .map(decode_image, num_parallel_calls=AUTOTUNE)
                .batch(BATCH_SIZE)
                .prefetch(AUTOTUNE)

               )
test= (tf.data.Dataset
                .from_tensor_slices((test_dataset))
                .map(decode_image, num_parallel_calls=AUTOTUNE)
                .batch(BATCH_SIZE)
                .prefetch(AUTOTUNE)

               )

# def load_image(filename,label):
#     img=cv2.imread(filename)
#     img=cv2.resize(img,(256,256))
#     img=cv2.cvtColor(img,cv2.COLOR_RGB2GRAY)
#     img=img/255.0
#     return img,label

# test_dataset= (tf.data.Dataset
#                .from_tensor_slices(test_paths)
#                .map(decode_image, num_parallel_calls=AUTO)
#                .batch(BATCH_SIZE)
#               )

class CustomImageDataset(tf.keras.utils.Sequence):
    def __init__(self, file_paths, labels, batch_size):
        self.file_paths = file_paths
        self.labels = labels
        self.batch_size = batch_size
        self.indexes = np.arange(len(self.file_paths))

    def __len__(self):
        return int(np.ceil(len(self.file_paths) / self.batch_size))

    def __getitem__(self, index):
        start = index * self.batch_size
        end = (index + 1) * self.batch_size
        batch_file_paths = self.file_paths[start:end]
        batch_labels = self.labels[start:end]

        images = [self.decode_image(file_path) for file_path in batch_file_paths]
        images = np.array(images)

        return images, np.array(batch_labels)
    
    
    def decode_image(self,filename, label=None, image_size=(256,256)):
        bits=tf.io.read_file(filename)
        image=tf.image.decode_jpeg(bits,channels=3)
        image = tf.image.rgb_to_grayscale(image)
        image=tf.cast(image,tf.float32) / 255.0#image to tf.float32 data type
        image=tf.image.resize(image,image_size)

        if label is None:
            return self.data_augment(image)
        else:
            return self.data_augment(image,label)

    def data_augment(self,image,label=None):
        image=tf.image.random_flip_left_right(image)
        image=tf.image.random_flip_up_down(image)

        if label is None:
            return image
        else:
            return image,label


batch_size = 32
target_size = (512,512,3)


train = CustomImageDataset(x_train, y_train, batch_size=BATCH_SIZE)
validation=CustomImageDataset(x_validation,y_validation,batch_size=BATCH_SIZE)


# EfficientNetB7

In [None]:
with strategy.scope():
    base_model=keras.applications.EfficientNetB7(input_shape=(256,256,3),weights='imagenet',include_top=False)
    x=Flatten()(base_model.output)
    x=Dense(512,activation='relu')(x)
    x=Dense(128,activation="relu")(x)
    output=Dense(1, activation='sigmoid')(x)
    model=models.Model(inputs=base_model.input,outputs=output)
    model.compile(optimizer='adam',loss='binary_crossentropy', metrics=['accuracy'])
    model.trainable=False
    model.summary()


In [None]:
early_stopping = EarlyStopping(monitor='val_loss', 
                                        min_delta=0, 
                                        patience=4, 
                                        verbose=0, 
                                        mode='max', 
                                        baseline=None, 
                                        restore_best_weights=True)

# Callback to continuously save the best model after every epoch.
model_checkpoint = ModelCheckpoint("efficient_model.h5", 
                                             monitor='val_accuracy', 
                                             verbose=0, 
                                             save_best_only=False,
                                             save_weights_only=False, 
                                             mode='max', 
                                             save_freq='epoch')


In [None]:
STEPS_PER_EPOCH=len(train_labels) // BATCH_SIZE
model.fit(train_dataset,validation_data=valid_dataset,epochs=5,steps_per_epoch=STEPS_PER_EPOCH,callbacks=[model_checkpoint,early_stopping])

In [None]:
from sklearn.metrics import accuracy_score

In [None]:
predict=model.predict(test)


In [None]:
accuracy_score(np.round(predict),test_label)

## ConvNetXtLarge

In [None]:
with strategy.scope():
    base_model=keras.applications.ConvNeXtLarge(input_shape=(256,256,3),weights='imagenet',include_top=False)
    x=Flatten()(base_model.output)
    x=Dense(512,activation='relu')(x)
    x=Dense(128,activation="relu")(x)
    output=Dense(1, activation='sigmoid')(x)
    convxnet_model=models.Model(inputs=base_model.input,outputs=output)
    convxnet_model.compile(optimizer='adam',loss='binary_crossentropy', metrics=['accuracy'])
    convxnet_model.trainable=False
    convxnet_model.summary()


In [None]:
model_checkpoint=ModelCheckpoint("convxnet_model.h5",save_best_only=True)
STEPS_PER_EPOCH=len(train_labels) // BATCH_SIZE
convxnet_model.fit(train_dataset,validation_data=valid_dataset,epochs=5,steps_per_epoch=STEPS_PER_EPOCH,callbacks=[model_checkpoint])

# MobileNet

In [None]:
with strategy.scope():
    base_model=keras.applications.MobileNetV2(input_shape=(256,256,3),weights='imagenet',include_top=False)
    x=Flatten()(base_model.output)
    x=Dense(512,activation='relu')(x)
    x=Dense(128,activation="relu")(x)
    output=Dense(1, activation='sigmoid')(x)
    mobilenet_model=models.Model(inputs=base_model.input,outputs=output)
    mobilenet_model.compile(optimizer='adam',loss='binary_crossentropy', metrics=['accuracy'])
    mobilenet_model.trainable=False
    mobilenet_model.summary()
model_checkpoint=ModelCheckpoint("mobilenet_model.h5",save_best_only=True)
STEPS_PER_EPOCH=len(train_labels)// BATCH_SIZE
mobilenet_model.fit(train_dataset,validation_data=valid_dataset,epochs=7,steps_per_epoch=STEPS_PER_EPOCH,callbacks=model_checkpoint)

# InceptionResNetV2

In [None]:
mobilenet_prediction=mobilenet_model.predict(test)
accuracy_score(np.round(mobilenet_prediction),test_label)

In [None]:
with strategy.scope():
    base_model=keras.applications.InceptionResNetV2(input_shape=(256,256,3),weights='imagenet',include_top=False)
    x=Flatten()(base_model.output)
    x=Dense(512,activation='relu')(x)
    x=Dense(128,activation="relu")(x)
    output=Dense(1, activation='sigmoid')(x)
    InceptionResNetV2=models.Model(inputs=base_model.input,outputs=output)
    InceptionResNetV2.compile(optimizer='adam',loss='binary_crossentropy', metrics=['accuracy'])
    InceptionResNetV2.trainable=False
    InceptionResNetV2.summary()
model_checkpoint=ModelCheckpoint("InceptionResNetV2.h5",save_best_only=True)
STEPS_PER_EPOCH=len(train_labels) // BATCH_SIZE
InceptionResNetV2.fit(train_dataset,validation_data=valid_dataset,epochs=5,steps_per_epoch=STEPS_PER_EPOCH,callbacks=[model_checkpoint])

In [None]:
InceptionResNetV2_prediction=InceptionResNetV2.predict(test)
accuracy_score(np.round(InceptionResNetV2_prediction),test_label)

# SRNET

In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import optimizers
from tensorflow.keras.models import Model

def conv_layer(input_tensor, num_filters, kernel_size, strides, padding='same'):
    
    # He initializer
    filter_initializer = tf.keras.initializers.HeNormal()

    # Bias initializer
    bias_initializer = tf.keras.initializers.Constant(value=0.2)

    # L2 regularization for the filters
    filter_regularizer = tf.keras.regularizers.L2(l2=2e-4)
    
    x = layers.Conv2D(num_filters,
                  kernel_size=kernel_size,
                  strides=strides,
                  padding=padding,
                  kernel_initializer=filter_initializer,
                  bias_initializer=bias_initializer,
                  kernel_regularizer=filter_regularizer,
                  use_bias=True)(input_tensor)
    
    return x


def layer_T1(input_tensor, num_filters):
    # Convolutional layer
    x = conv_layer(input_tensor, 
                   num_filters=num_filters, 
                   kernel_size=(3, 3), 
                   strides=1)
    
    # Batch normalization layer
    x = layers.BatchNormalization(momentum=0.9)(x)

    # ReLU activation layer
    x = layers.ReLU()(x)
    
    return x


def layer_T2(input_tensor, num_filters):
    # Add the layer T1 to the beginning of Layer T2
    x = layer_T1(input_tensor, num_filters)
    
    # Convolutional layer
    x = conv_layer(x, 
                   num_filters=num_filters, 
                   kernel_size=(3, 3), 
                   strides=1)
    
    # Batch normalization layer
    x = layers.BatchNormalization(momentum=0.9)(x)
    
    # Create the residual connection
    x = layers.add([input_tensor, x]) 
    
    return x


def layer_T3(input_tensor, num_filters):
    # MAIN BRANCH
    # Add the layer T1 to the beginning of Layer T2
    x = layer_T1(input_tensor, num_filters)
    
    # Convolutional layer
    x = conv_layer(x, 
                   num_filters=num_filters, 
                   kernel_size=(3, 3), 
                   strides=1)
    
    # Batch normalization layer
    x = layers.BatchNormalization(momentum=0.9)(x)
    
    # Average pooling layer
    x = layers.AveragePooling2D(pool_size=(3, 3), 
                                strides=2,
                                padding='same')(x)
    
    # SECONDARY BRANCH
    # Special convolutional layer. 
    y = conv_layer(input_tensor, 
                   num_filters=num_filters, 
                   kernel_size=(1, 1), 
                   strides=2)
    
    # Batch normalization layer
    y = layers.BatchNormalization(momentum=0.9)(y)
    
    # Create the residual connection
    output = layers.add([x, y]) 
    
    return output


def layer_T4(input_tensor, num_filters):
    # Add the layer T1 to the beginning of Layer T2
    x = layer_T1(input_tensor, num_filters)
    
    # Convolutional layer
    x = conv_layer(x, 
                   num_filters=num_filters, 
                   kernel_size=(3, 3), 
                   strides=1)
    
    # Batch normalization layer
    x = layers.BatchNormalization(momentum=0.9)(x)
    
    # Global Average Pooling layer
    x = layers.GlobalAveragePooling2D()(x)
    
    return x


def fully_connected(input_tensor):
    
    # Dense weight initializer N(0, 0.01)
    dense_initializer = tf.random_normal_initializer(0, 0.01)
    
    # Bias initializer for the fully connected network
    bias_dense_initializer = tf.constant_initializer(0.)
    
    x = layers.Flatten()(input_tensor)
    x = layers.Dense(512, 
                     activation=None,
                     use_bias=False,
                     kernel_initializer=dense_initializer,
                     bias_initializer=bias_dense_initializer)(x)

        
    output = layers.Dense(1, activation='sigmoid')(x)
    
    return output


def create_SRNet(input_image_size):
    # The input layer has the shape (256, 256, 1)
    input_layer = layers.Input(shape=input_image_size)

    x = layer_T1(input_layer, 64)
    x = layer_T1(x, 16)
    
    x = layer_T2(x, 16)
    x = layer_T2(x, 16)
    x = layer_T2(x, 16)
    x = layer_T2(x, 16)
    x = layer_T2(x, 16)
    
    x = layer_T3(x, 16)
    x = layer_T3(x, 64)
    x = layer_T3(x, 128)
    x = layer_T3(x, 256)
    
    x = layer_T4(x, 512)
    
    output = fully_connected(x)
    
    model = Model(inputs=input_layer, outputs=output, name="SRNet")
    
    return model

In [None]:
from tensorflow.keras import callbacks
with strategy.scope():
    INPUT_IMAGE_SIZE=(256,256,1)
    srnet_model = create_SRNet(INPUT_IMAGE_SIZE)

    # Compile the model selecting the loss, the optimizer and the metrics.
    srnet_model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
                  optimizer=optimizers.Adam(learning_rate=0.01),
                  metrics=['accuracy'])
    # Callback to stop the algorithm when it doesn't improve.
early_stopping = callbacks.EarlyStopping(monitor='val_accuracy', 
                                        min_delta=0, 
                                        patience=3, 
                                        verbose=0, 
                                        mode='max', 
                                        baseline=None, 
                                        restore_best_weights=True)

# Callback to continuously save the best model after every epoch.
model_checkpoint = callbacks.ModelCheckpoint("srnet_model.h5", 
                                             monitor='val_accuracy', 
                                             verbose=0, 
                                             save_best_only=False,
                                             save_weights_only=False, 
                                             mode='max', 
                                             save_freq='epoch')

    # Callback to change the learning rate after 150 epochs
def lr_schedule(epoch):
    if epoch <= 2:
        return 0.01
    else:
        return 0.001

learning_rate_scheduler = callbacks.LearningRateScheduler(lr_schedule, verbose=0)
NUM_EPOCHS = 5
STEPS_PER_EPOCH=len(train_labels) // BATCH_SIZE    # Execute the training with all the callbacks
trainHistory = srnet_model.fit(train_dataset,
                         steps_per_epoch=STEPS_PER_EPOCH,
                         epochs=NUM_EPOCHS, 
                         validation_data=valid_dataset,
                         callbacks=[early_stopping, model_checkpoint, learning_rate_scheduler])

In [None]:
srnet_model_prediction=srnet_model.predict(test)
accuracy_score(np.round(srnet_model_prediction),test_label)

In [None]:
reduce_lr_loss = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1, epsilon=1e-4, mode='min')
with strategy.scope():
    base_model=keras.applications.xception.Xception(input_shape=(256,256,3),weights='imagenet',include_top=False)
    x=Flatten()(base_model.output)
    x=Dense(512,activation='relu')(x)
    x=Dense(128,activation="relu")(x)
    output=Dense(1, activation='sigmoid')(x)
    xception=models.Model(inputs=base_model.input,outputs=output)
    xception.compile(optimizer='adam',loss='binary_crossentropy', metrics=['accuracy'])
    xception.trainable=False
    xception.summary()
model_checkpoint=ModelCheckpoint("xception.h5",save_best_only=True)
STEPS_PER_EPOCH=len(train_labels) // BATCH_SIZE
xception.fit(train_dataset,validation_data=valid_dataset,epochs=5,steps_per_epoch=STEPS_PER_EPOCH,callbacks=[model_checkpoint,reduce_lr_loss])

# Evaluation of Efficientnet

In [None]:
def model_input_create(paths):
    neg_images=[]
    for path in paths:
        neg_images.append(decode_image(path))
    return np.array(neg_images)


def label_creation(predictions):
    neg_label=[]
    for i in range(len(predictions)):
        if predictions[i]>0.5:
            neg_label.append(1)
        else:
            neg_label.append(0)
    return neg_label

In [None]:
#load models
with strategy.scope():
    ef_model=keras.models.load_model("/kaggle/input/models/efficient_model.h5")


In [None]:
train_filenames = np.array(os.listdir("/kaggle/input/alaska2-image-steganalysis/Cover"))
np.random.seed(0)
positives = train_filenames.copy()
negatives = train_filenames.copy()
np.random.shuffle(positives)
np.random.shuffle(negatives)

jmipod = append_path('JMiPOD')(positives[5000:10000])
juniward = append_path('JUNIWARD')(positives[10000:15000])
uerd = append_path('UERD')(positives[15000:20000])
neg_path = append_path('Cover')(negatives[15000:30000])
pos_path=np.concatenate([jmipod,juniward,uerd])
np.random.shuffle(pos_path)
train_paths=np.concatenate([neg_path,pos_path])
np.random.shuffle(train_paths)

train_labels=[]
for path in train_paths:
    if "Cover" in path:
        train_labels.append(0)
    else:
        train_labels.append(1)

In [None]:
train_images=model_input_create(train_paths)
train_pred=ef_model.predict(train_images)

In [None]:
#find the number of data points below 0.35

mis_classified=[]

for path,pred,true in zip(train_paths,train_pred,train_labels):
    if (pred>0.5) & (true!=1):
        mis_classified.append(path)
    if (pred<0.5) & (true!=0):
        mis_classified.append(path)
mis_classified=np.concatenate([mis_classified,append_path('JMiPOD')(positives[10000:12000]),append_path('JUNIWARD')(positives[12000:14000]),append_path('UERD')(positives[14000:16000])])
mis_classified_labels=[]
for path in mis_classified:
    if "Cover" in path:
        mis_classified_labels.append(0)
    else:
        mis_classified_labels.append(1)

In [None]:
count=0
for i in mis_classified_labels:
    if i==0:
        count+=1

In [None]:
len(mis_classified_labels)-count

In [None]:
x_train, x_validation, y_train, y_validation = train_test_split(mis_classified, mis_classified_labels, test_size=0.15, random_state=1000)
misclassified_train_dataset = (tf.data.Dataset
                 .from_tensor_slices((x_train,y_train))
                 .map(decode_image, num_parallel_calls=AUTOTUNE)
                 .cache()
                 .shuffle(1024)
                 .batch(BATCH_SIZE)
                 .prefetch(AUTOTUNE)
                )
misclassified_valid_dataset= (tf.data.Dataset
                .from_tensor_slices((x_validation,y_validation))
                .map(decode_image, num_parallel_calls=AUTOTUNE)
                .batch(BATCH_SIZE)
                .prefetch(AUTOTUNE)

               )
early_stopping = EarlyStopping(monitor='val_accuracy', 
                                        min_delta=0, 
                                        patience=5, 
                                        verbose=0, 
                                        mode='max', 
                                        baseline=None, 
                                        restore_best_weights=True)

In [None]:
reduce_lr_loss = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1, epsilon=1e-4, mode='min')
ef_model.fit(misclassified_train_dataset,validation_data=misclassified_valid_dataset,epochs=10,callbacks=[early_stopping,reduce_lr_loss])

In [None]:
pred=ef_model.predict(model_input_create(mis_classified))

In [None]:
from sklearn.metrics import classification_report

In [None]:

print(classification_report(mis_classified_labels,np.round(pred)))

In [None]:
import pandas as pd
sub = pd.read_csv('/kaggle/input/alaska2-image-steganalysis/sample_submission.csv')
test_paths = append_path('Test')(sub.Id.values)
test_dataset = (
    tf.data.Dataset
    .from_tensor_slices(test_files)
    .map(decode_image, num_parallel_calls=AUTOTUNE)
    .batch(BATCH_SIZE)
)

sub.Label=ef_model.predict(test_dataset)
sub.to_csv('submission.csv', index=False)
sub.head()

In [None]:
keras.models.save_model(ef_model,"efficientnet_model.h5")

In [None]:
with strategy.scope():
    srnet=keras.models.load_model("/kaggle/input/models/srnet_model.h5")

In [None]:
early_stopping = EarlyStopping(monitor='val_accuracy', 
                                        min_delta=0, 
                                        patience=5, 
                                        verbose=0, 
                                        mode='max', 
                                        baseline=None, 
                                        restore_best_weights=True)
reduce_lr_loss = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=2, verbose=1, epsilon=1e-4, mode='min')
srnet.fit(train_dataset,validation_data=valid_dataset,epochs=10,callbacks=[early_stopping,reduce_lr_loss],steps_per_epoch=len(x_train)//batch_size)