In [7]:
from preprocessing.patch_generator import smash_n_reconstruct
import preprocessing.filters as f
import tensorflow as tf
from keras import layers,Model
from keras.callbacks import ModelCheckpoint, EarlyStopping
import os

In [60]:
from keras import layers, saving

#@tf.function
def hard_tanh(x):
    return tf.maximum(tf.minimum(x, 1), -1)

@saving.register_keras_serializable(package="Custom")
class featureExtractionLayer(layers.Layer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conv = layers.Conv2D(filters=32, kernel_size=(3,3), activation='relu')
        self.bn = layers.BatchNormalization()
        self.activation = layers.Lambda(hard_tanh)
        
    def call(self, input):
        x = self.conv(input)
        x = self.bn(x)
        x = self.activation(x)
        return x
        

In [61]:
input1 = layers.Input(shape=(256,256,1),name="rich_texture")
input2 = layers.Input(shape=(256,256,1),name="poor_texture")

l1 = featureExtractionLayer(name="feature_extraction_layer_rich_texture")(input1)
l2 = featureExtractionLayer(name="feature_extraction_layer_poor_texture")(input2)

contrast = layers.subtract((l1,l2))

x = layers.Conv2D(filters=32,kernel_size=(3,3),activation='relu')(contrast)
x = layers.BatchNormalization()(x)
for i in range(3):
    x = layers.Conv2D(filters=32,kernel_size=(3,3),activation='relu')(x)
    x = layers.BatchNormalization()(x)
x = layers.BatchNormalization()(x)

for i in range(4):
    x = layers.Conv2D(filters=32,kernel_size=(3,3),activation='relu')(x)
    x = layers.BatchNormalization()(x)
x = layers.AveragePooling2D(2,2)(x) #added 2,2

for i in range(2):
    x = layers.Conv2D(filters=32,kernel_size=(3,3),activation='relu')(x)
    x = layers.BatchNormalization()(x)
x = layers.AveragePooling2D(2,2)(x) #added 2,2

for i in range(2):
    x = layers.Conv2D(filters=32,kernel_size=(3,3),activation='relu')(x)
    x = layers.BatchNormalization()(x)
x = layers.GlobalAveragePooling2D()(x)

x = layers.Flatten()(x)
x = layers.Dense(1,activation='sigmoid')(x)

model = Model(inputs=(input1,input2), outputs=x, name="rich_texture_poor_texture_contrast")
model.compile(
                optimizer='adam',
                loss='BinaryCrossentropy',
                metrics=['binary_accuracy']
            )
model.summary()

In [62]:
path_ai = 'C:/Users/esull/OneDrive/Documents/ds340_final_attempt/Detection-of-AI-generated-images/data/ai_images/'
ai_imgs = [os.path.join(path_ai,img) for img in os.listdir(path_ai)[:5]]
ai_label = [1 for i in range(len(ai_imgs))]
path_real = 'C:/Users/esull/OneDrive/Documents/ds340_final_attempt/Detection-of-AI-generated-images/data/real_images/'
real_imgs = [os.path.join(path_real,img) for img in os.listdir(path_real)[:5]]
real_label = [0 for i in range(len(real_imgs))]
print(len(real_imgs),len(ai_imgs))
X_train = ai_imgs + real_imgs#ai_imgs[:-21] + real_imgs[:-21]
y_train = ai_label + real_label#ai_label[:-21] + real_label[:-21]
X_validate = ai_imgs + real_imgs#ai_imgs[-21:] + real_imgs[-21:]
y_validate = ai_label + real_label#ai_label[-21:] + real_label[-21:]
len(X_train),len(y_train),len(X_validate),len(y_validate)

5 5


(10, 10, 10, 10)

In [63]:
def preprocess(path,label:int):
    rt,pt = smash_n_reconstruct(path.numpy().decode('utf-8'))
    frt = tf.cast(tf.expand_dims(f.apply_all_filters(rt),axis=-1),dtype=tf.float32)
    fpt = tf.cast(tf.expand_dims(f.apply_all_filters(pt), axis=-1),dtype=tf.float32)

    return frt,fpt,label

In [64]:
def dict_map(X1,X2,y):
    return {
        'rich_texture':X1,
        'poor_texture':X2
    },y

## Making data pipeline

In [65]:
'''
batch_size = 32

dataset = (tf.data.Dataset.from_tensor_slices((X_train,y_train))
           .shuffle(len(X_train))
           .map(
                lambda filepath,label: 
                tf.py_function(preprocess, [filepath, label],[tf.float64, tf.float64, tf.int32])
            ).map(dict_map)
            .batch(batch_size)
            .prefetch(tf.data.AUTOTUNE)
        )

validation_set = (tf.data.Dataset.from_tensor_slices((X_validate,y_validate))
           .map(
                lambda filepath,label: 
                tf.py_function(preprocess, [filepath, label],[tf.float64, tf.float64, tf.int32])
            ).map(dict_map)
            .batch(10)
            .prefetch(tf.data.AUTOTUNE)
        )
'''
batch_size = 32

def set_shapes(frt, fpt, label):
    frt.set_shape([256, 256, 1])  # Expected shape after preprocessing
    fpt.set_shape([256, 256, 1])  # Expected shape after preprocessing
    label.set_shape([])  # Scalar shape for the label
    return {'rich_texture': frt, 'poor_texture': fpt}, label

dataset = (
    tf.data.Dataset.from_tensor_slices((X_train, y_train))
    .shuffle(len(X_train))
    .map(
        lambda filepath, label: tf.py_function(preprocess, [filepath, label], [tf.float32, tf.float32, tf.int32])
    )
    .map(set_shapes)  # Set shapes explicitly
    .batch(batch_size)
    .prefetch(tf.data.AUTOTUNE)
)

validation_set = (
    tf.data.Dataset.from_tensor_slices((X_validate, y_validate))
    .map(
        lambda filepath, label: tf.py_function(preprocess, [filepath, label], [tf.float32, tf.float32, tf.int32])
    )
    .map(set_shapes)  # Set shapes explicitly
    .batch(batch_size)
    .prefetch(tf.data.AUTOTUNE)
)


In [66]:
checkpoint_path = "./checkpoints/model_checkpoint3.keras"
checkpoint_callback = ModelCheckpoint(filepath=checkpoint_path, 
                                      monitor='val_loss', 
                                      save_best_only=True,
                                      verbose=1)

early_stopping_callback = EarlyStopping(monitor='val_loss', 
                                        patience=5,
                                        verbose=1, 
                                        restore_best_weights=True)


## Training the model

In [67]:
model.fit(dataset, epochs=5, batch_size=32, validation_data=validation_set,callbacks=[checkpoint_callback, early_stopping_callback])

Epoch 1/5
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13s/step - binary_accuracy: 0.9000 - loss: 0.5995
Epoch 1: val_loss improved from inf to 0.69137, saving model to ./checkpoints/model_checkpoint3.keras
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 17s/step - binary_accuracy: 0.9000 - loss: 0.5995 - val_binary_accuracy: 0.5000 - val_loss: 0.6914
Epoch 2/5
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7s/step - binary_accuracy: 0.9000 - loss: 0.3510
Epoch 2: val_loss improved from 0.69137 to 0.69039, saving model to ./checkpoints/model_checkpoint3.keras
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 10s/step - binary_accuracy: 0.9000 - loss: 0.3510 - val_binary_accuracy: 0.5000 - val_loss: 0.6904
Epoch 3/5
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7s/step - binary_accuracy: 0.9000 - loss: 0.1996
Epoch 3: val_loss did not improve from 0.69039
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10

<keras.src.callbacks.history.History at 0x20b1f07e030>

In [68]:
model.save('./classifier.keras')#used to be .h5