<a href="https://colab.research.google.com/github/AlessandroPata/CNN_Watermark_Detection/blob/main/WD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Dataset Upload and pre-processing
#Consider to execute the code using a 300gb Ram TPU (TPU v2) at least!
#The whole project can be consulted on Github. https://github.com/AlessandroPata/CNN_Watermark_Detection
!pip install gdown
import gdown
from google.colab import drive
#drive.mount('/content/drive')
!pip install tensorflow
!pip install keras-tuner
import sklearn
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import classification_report,accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from kerastuner import HyperModel, RandomSearch
!pip install kaggle
from tensorflow.keras.regularizers import l2
import gc
!kaggle datasets download -d alessandropata/watermark-det
!unzip watermark-det.zip
dataset = tf.keras.preprocessing.image_dataset_from_directory(
    "/content/Watermark_Detection_ver_2.0",
    image_size=(224, 224),
    batch_size=1,
    label_mode='int'
)
dataset=dataset.shuffle(buffer_size=70000)
def filter_and_shuffle(ds, label, shuffle_buffer_size=70000):
    return ds.filter(lambda x, y: tf.math.equal(y[0], label)).shuffle(buffer_size=shuffle_buffer_size)
nw_dataset = filter_and_shuffle(dataset, 0)
w_dataset = filter_and_shuffle(dataset, 1)
train_nw=nw_dataset.take(24184)
train_w=w_dataset.take(24184)
valtestnw=nw_dataset.skip(24184).take(12739)
valtestw=w_dataset.skip(24184).take(7992)
train_dataset = train_nw.concatenate(train_w).shuffle(buffer_size=70000)
valtest = valtestnw.concatenate(valtestw).shuffle(buffer_size=70000)
val_dataset=valtest.take(10365)
test_dataset=valtest.skip(10365).take(10365)
batch_size = 32
train_dataset = train_dataset.unbatch().batch(batch_size)
val_dataset= val_dataset.unbatch().batch(batch_size)
test_dataset= test_dataset.unbatch().batch(batch_size)
train = train_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
val = val_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
test = test_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
!rm -r ./sample_data
!rm -r ./watermark-det.zip
test_labels = []
for _, label_batch in test:
    test_labels.extend(label_batch.numpy())
gc.collect()

In [None]:
"""
#Model Configuration and Testing
#The lines of code included in the comment were useful for finding the most performant model via Keras Tuner and for training the hyperparameters.
#There is no need to execute them, as the trained model can be downloaded from Google Drive.
class BinaryClassifierHyperModel(HyperModel):
    def __init__(self, input_shape):
        self.input_shape = input_shape

    def build(self, hp):
        model = Sequential([
            Conv2D(
                filters = hp.Choice('conv_1_filter', values=[64]),
                kernel_size=hp.Choice('conv_1_kernel', values=[5]),
                activation='relu',
                input_shape=self.input_shape,
                kernel_regularizer=l2(hp.Float('conv_1_l2', min_value=6e-5, max_value=10e-5, sampling='log'))
            ),
            BatchNormalization(),
            MaxPooling2D(pool_size=(2, 2)),
            Conv2D(
                filters=hp.Choice('conv_2_filter', values=[64]),
                kernel_size=hp.Choice('conv_2_kernel', values=[3]),
                activation='relu',
                kernel_regularizer=l2(hp.Float('conv_2_l2', min_value=2e-5, max_value=8e-5, sampling='log'))
            ),
            BatchNormalization(),
            MaxPooling2D(pool_size=(2, 2)),
            Flatten(),
            Dense(
                units=hp.Choice('dense_units', values=[512]),
                activation='relu',
                kernel_regularizer=l2(hp.Float('dense_l2', min_value=4e-5, max_value=8e-4, sampling='log'))
            ),
            Dropout(0.5),
            Dense(1, activation='sigmoid')
        ])

        model.compile(
            optimizer=Adam(hp.Choice('learning_rate', values=[1e-4])),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        return model
hypermodel = BinaryClassifierHyperModel(input_shape=(224, 224, 3))
tuner = RandomSearch(
    hypermodel,
    objective='val_accuracy',
    max_trials=64,
    executions_per_trial=1,
    directory='/content/drive/MyDrive/Watermark/tuner/',
    project_name='hparam_tuning'
)
tuner.search(train, epochs=9, validation_data=val, callbacks=callbacks)
best_model = tuner.get_best_models(num_models=1)[0]

model = Sequential([
    Conv2D(
        filters=64,
        kernel_size=5,
        activation='relu',
        input_shape=(224, 224, 3),
        kernel_regularizer=l2(8.916867698179726e-05)
    ),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Conv2D(
        filters=64,
        kernel_size=3,
        activation='relu',
        kernel_regularizer=l2(2.785257633027672e-05)
    ),
    BatchNormalization(),
    MaxPooling2D(pool_size=(2, 2)),
    Flatten(),
    Dense(
        units=512,
        activation='relu',
        kernel_regularizer=l2(0.0006143210398780726)
    ),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])
model.compile(
    optimizer=Adam(learning_rate=1e-4),
    loss='binary_crossentropy',
    metrics=['accuracy']
)
"""
callbacks = [
    ModelCheckpoint('/content/Watermark/model.keras', save_best_only=True,save_weights_only=False, monitor='val_loss', mode='min'),
    ReduceLROnPlateau(monitor='val_loss', factor=0.3, patience=0),
    EarlyStopping(monitor='val_loss', patience=2, verbose=1)
]
# Download the model file from Google Drive
url = 'https://drive.google.com/uc?export=download&id=1-6eUOTLs4X_6-nMUyj-Kc5gFWi64V2yz'
output_file = 'model.keras'
gdown.download(url, output_file, quiet=False)
# Load the model from the downloaded file
model = load_model('model.keras')
"""
training = model.fit(
    train,
    epochs=10000,
    validation_data=val,
    callbacks=callbacks
)
"""
test_loss, test_acc = model.evaluate(test)
#pd.DataFrame(training.history)[["accuracy", "val_accuracy"]].plot()
test_predictions = model.predict(test).round().astype(int)
test_predictions = test_predictions.flatten()
cm = confusion_matrix(test_labels, test_predictions)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()
print(sklearn.metrics.classification_report(test_labels, test_predictions, labels=[0,1],target_names=["no_watermark","watermark"], digits=4))
print(model.summary())

In [None]:
#Printing Missclassified Images
misclassified_images = []
misclassified_labels = []
misclassified_predictions = []
index = 0
for images, labels in test:
    for j in range(images.shape[0]):
        if test_predictions[index] =! labels[j].numpy():
            misclassified_images.append(images[j].numpy().astype("uint8"))
            misclassified_labels.append(labels[j].numpy())
            misclassified_predictions.append(test_predictions[index])
        index += 1

# Iterate through misclassified images and display them
for i in range(len(misclassified_images)):
    plt.figure()
    plt.imshow(misclassified_images[i])
    plt.title(f"True label: {misclassified_labels[i]}, Predicted: {misclassified_predictions[i]}")
    plt.show()