In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import *
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.callbacks import *
from sklearn.model_selection import train_test_split

from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

import warnings
warnings.filterwarnings('ignore')

In [None]:
def create_df(path, save_as="dataset.csv"):
    
    df = pd.DataFrame(columns = ['path', 'class', 'class_label'])
    for main_dir in os.listdir(path):
        if main_dir=="dataset_information.csv":
            continue 
        clean_path = os.path.join(path, main_dir + '/' + main_dir + '/' + 'clean')
        stego_path = os.path.join(path, main_dir + '/' + main_dir + '/' + 'stego')
        
        for image in os.listdir(clean_path):
            
            new_row = {
                'path': os.path.join(clean_path, image),
                'class': 'clean',
                'class_label': 0
            }
            df = pd.concat(
                [df, pd.DataFrame([new_row])], ignore_index=True
            )
            
        for image in os.listdir(stego_path):
            
            new_row = {
                'path': os.path.join(stego_path, image),
                'class': 'stego',
                'class_label': 1
            }
            df = pd.concat(
                [df, pd.DataFrame([new_row])], ignore_index=True
            )
        
    df.to_csv(save_as)
    return df

In [None]:
df = create_df('/kaggle/input/stegoimagesdataset')

In [None]:
from sklearn.model_selection import train_test_split

temp_df, val_df = train_test_split(df, test_size=0.1, random_state=42)
train_df, test_df = train_test_split(temp_df, test_size=0.1, random_state=42)

train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)

In [None]:
def load_and_preprocess_from_path_label(path, class_label):

    image = tf.io.read_file(path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, (224, 224))

    return image, class_label

def augmentation(ds):
    preprocessing_model = tf.keras.Sequential([])
#     preprocessing_model.add(tf.keras.layers.Rescaling(1. / 255))
    
    ds = ds.map(
        lambda image, class_label:
        (preprocessing_model(image), (class_label))
    )

    return ds

def build_dataset_from_df(df, batch_size):
    ds = tf.data.Dataset.from_tensor_slices((
        [path for path in df["path"]],
        list(df['class_label'])
    ))

    ds = ds.map(load_and_preprocess_from_path_label)
    ds = ds.shuffle(buffer_size=1000)
    ds = ds.repeat()
    ds = ds.batch(batch_size)
    ds = ds.prefetch(buffer_size=tf.data.AUTOTUNE)
    ds = augmentation(ds)

    return ds  

In [None]:
train_ds = build_dataset_from_df(train_df, 10)
valid_ds = build_dataset_from_df(val_df, 10)
test_ds = build_dataset_from_df(test_df, 10)

In [None]:
for image, class_label in valid_ds.take(10):
    print("Image shape: ", image.shape)
    print("class_label shape: ", class_label)

In [None]:
optimizer_adam = Adam(learning_rate=0.01)

EarlyStop = EarlyStopping(patience=10, restore_best_weights=True)
Reduce_LR = ReduceLROnPlateau(monitor='val_accuracy', verbose=2, factor=0.5, min_lr=0.00001)
model_check = ModelCheckpoint('model.h5', monitor='val_loss', verbose=1, save_best_only=True)

callback = [EarlyStop, Reduce_LR, model_check]

In [None]:
def make_mobilenet_model(image_size, num_classes=1):

    input_shape = image_size + (3,)

    base_model = tf.keras.applications.MobileNet(
        input_shape=input_shape,
        include_top=False,
        weights="imagenet",
        pooling="avg"  
    )

    base_model.trainable = False
    
    inputs = tf.keras.Input(shape=input_shape)
    x = base_model(inputs, training=False)
    x = Dense(128, activation='relu')(x)
    outputs = Dense(num_classes, activation='sigmoid')(x)

    model = Model(inputs, outputs)

    return model

In [None]:
# Define a model using the make_model function
image_size = (224, 224)
mobilenet_model = make_mobilenet_model(image_size)

In [None]:
# Preview the Model Summary
mobilenet_model.summary()

In [None]:
mobilenet_model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy']
)

epochs = 16
steps_per_epoch = len(train_df) // 16
validation_steps = len(val_df) // 16

hist = mobilenet_model.fit(
    train_ds,
    epochs=epochs,
    steps_per_epoch=steps_per_epoch,
    validation_data=valid_ds,
    validation_steps=validation_steps,
    callbacks=callback
).history


In [None]:
steps_per_epoch = len(test_df) // 16
mobilenet_model.evaluate(test_ds, steps=steps_per_epoch)

In [None]:
# PLOT MODEL HISTORY OF ACCURACY AND LOSS OVER EPOCHS
plt.plot(hist['accuracy'])
plt.plot(hist['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(hist['loss'])
plt.plot(hist['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
classes = ["Clean", "Stego"]
def predict(image):
    image = cv2.imread(image)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (224, 224))
    
    result = mobilenet_model.predict(np.expand_dims(image, axis=0))
    
    if result[0][0] <=0.5:
        print('Clean')
    else:
        print('Stego')
    
predict('/kaggle/input/madusha-stego/train/train/clean/100.png')
predict('/kaggle/input/madusha-stego/test/test/stego/1020.png')

In [None]:
model_json = mobilenet_model.to_json()
with open("steg_model.json", "w") as json_file:
    json_file.write(model_json)
    
mobilenet_model.save_weights('steg_model.h5')

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import *
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *
from tensorflow.keras.callbacks import *
from sklearn.model_selection import train_test_split

from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

import warnings
warnings.filterwarnings('ignore')

json_file = open('/kaggle/input/sample-data/steg_model.json', 'r')
loaded_model_json = json_file.read()
json_file.close()

steg_model = model_from_json(loaded_model_json)
steg_model.load_weights('/kaggle/input/sample-data/model.h5')
print("Loaded model from disk")

classes = ["Clean", "Stego"]
def predict_inference(image):
    image = cv2.imread(image)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (224, 224))
    
    result = steg_model.predict(np.expand_dims(image, axis=0))
    
    if result[0][0] <=0.5:
        print('Clean')
    else:
        print('Stego')
        
predict_inference('/kaggle/input/sample-data/Lena Test/12/clean/43.png')
predict_inference('/kaggle/input/sample-data/Lena Test/12/stego/Lenna_(test_image) (1).png')