<a href="https://www.kaggle.com/code/neemakinyamuroki/classification-model-1?scriptVersionId=227722349" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
# Step 1: Import Packages
import os
import warnings
import numpy as np
import tensorflow as tf
import multiprocessing
import matplotlib.pyplot as plt
import shutil
import gc


from sklearn.utils import shuffle
from PIL import Image
from tensorflow.keras.models import Sequential
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.applications import MobileNet
from tensorflow.keras.layers import GlobalAveragePooling2D
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
from PIL import ImageFile
from tensorflow.keras.mixed_precision import set_global_policy
from tensorflow.keras.layers import Input

In [2]:
# Step 2: Increase PIL's image size limit and suppress warnings
Image.MAX_IMAGE_PIXELS = None  # Remove the limit
ImageFile.LOAD_TRUNCATED_IMAGES = True  # Allow loading of truncated images
warnings.filterwarnings("ignore", category=Image.DecompressionBombWarning)  # Suppress the warning

In [3]:
# Step 3: Enable Mixed Precision Training
set_global_policy('mixed_float16')  # Enable mixed precision

In [4]:
# ✅ Define dataset paths
dataset_path = "/kaggle/input/ai-generated-images-vs-real-images"
train_dir = os.path.join(dataset_path, "train")
test_dir = os.path.join(dataset_path, "test")

In [5]:
# Detect TPU
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print(f"Running on TPU: {tpu.master()}")
except ValueError:
    tpu = None

# Set strategy
if tpu:
    tf.config.experimental_connect_to_cluster(tpu)
    tf.tpu.experimental.initialize_tpu_system(tpu)
    strategy = tf.distribute.TPUStrategy(tpu)
else:
    strategy = tf.distribute.get_strategy()

print(f"REPLICAS: {strategy.num_replicas_in_sync}")

REPLICAS: 1


In [6]:
# Step 5: Verify Dataset Existence
if not os.path.exists(train_dir) or not os.path.exists(test_dir):
    raise FileNotFoundError("Dataset directories not found. Please check dataset path.")

In [7]:
# ✅ Function to check & convert images
def check_image(file_path):
    """Checks if an image is corrupt and converts P-mode images."""
    try:
        img = Image.open(file_path)
        
        # Convert "P" mode (palette) to "RGBA" or "RGB"
        if img.mode == "P":
            img = img.convert("RGBA")

        img.verify()  # Verify integrity
        return None  
    except (IOError, SyntaxError, OSError):
        return file_path  # Return corrupt image path

In [8]:
# ✅ Function to move corrupt images
def copy_corrupt_images(file_path, corrupt_dir="/kaggle/working/corrupt_images"):
    """Copy corrupt images to another directory since deletion isn't allowed."""
    os.makedirs(corrupt_dir, exist_ok=True)  # Create folder if it doesn't exist
    try:
        shutil.copy(file_path, os.path.join(corrupt_dir, os.path.basename(file_path)))
        print(f"Copied corrupt image: {file_path}")
    except Exception as e:
        print(f"Failed to copy {file_path}: {e}")

In [9]:
# ✅ Function to remove corrupt images (runs in parallel for speed)
def remove_corrupt_images_parallel(directory):
    """Finds and logs corrupt images using multiprocessing for efficiency."""
    if not os.path.exists(directory):
        return

    all_images = []
    for folder in os.listdir(directory):
        folder_path = os.path.join(directory, folder)
        if os.path.isdir(folder_path):
            all_images.extend([os.path.join(folder_path, f) for f in os.listdir(folder_path)])

    # Use multiple CPU cores for parallel processing
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        corrupt_files = pool.map(check_image, all_images)

    # Copy corrupt images instead of deleting/moving
    for file in filter(None, corrupt_files):  # Remove None values
        copy_corrupt_images(file)

# ✅ Run the image cleaning function
remove_corrupt_images_parallel(train_dir)
remove_corrupt_images_parallel(test_dir)

Copied corrupt image: /kaggle/input/ai-generated-images-vs-real-images/test/real/5879.jpg


In [10]:
# Step 6: Define Image Dimensions and Batch Size
img_width, img_height = 100, 100  # Reduced image size
batch_size = 8  # Reduced batch size

In [11]:
# Step 7: Data Augmentation and Normalization
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=20,
    horizontal_flip=True,
    fill_mode='nearest'
)

test_datagen = ImageDataGenerator(rescale=1./255)


In [12]:

# Step 8: Load Training and Validation Data
train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(img_width, img_height),
    batch_size=batch_size,
    class_mode='binary',
    subset='training',  # Use only a subset of the data
    seed=42  # Ensure reproducibility
)

test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=(img_width, img_height),
    batch_size=batch_size,
    class_mode='binary'
)

Found 48000 images belonging to 2 classes.
Found 12000 images belonging to 2 classes.


In [13]:
# Step 9: Define CNN Model (Simplified)
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(img_width, img_height, 3)),
    MaxPooling2D(pool_size=(2, 2)),
    
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    
    Conv2D(128, (3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    
    Flatten(),
    Dense(18432, activation='relu'),  # Match the output of Flatten
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [14]:
# Step 10: Compile the Model
model.compile(optimizer=Adam(learning_rate=0.001),
              loss='binary_crossentropy',
              metrics=['accuracy'])

# Step 11: Print Model Summary
model.summary()

In [15]:
# Step 12: Train the Model
steps_per_epoch = train_generator.samples // batch_size  # Use all training data
validation_steps = test_generator.samples // batch_size  # Use all test data

steps_per_epoch = 40  # Adjust based on dataset size
history = model.fit(
    train_generator,
    epochs=10,
    validation_data=test_generator,
    steps_per_epoch=steps_per_epoch,
    callbacks=[EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)]
)

Epoch 1/10


  self._warn_if_super_not_called()


[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9s/step - accuracy: 0.5478 - loss: 5.1730



[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2716s[0m 69s/step - accuracy: 0.5470 - loss: 5.2179 - val_accuracy: 0.5000 - val_loss: 7.9713
Epoch 2/10
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2775s[0m 71s/step - accuracy: 0.5058 - loss: 7.8788 - val_accuracy: 0.5000 - val_loss: 7.9713
Epoch 3/10
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2776s[0m 71s/step - accuracy: 0.4739 - loss: 8.3876 - val_accuracy: 0.5000 - val_loss: 7.9713
Epoch 4/10
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2761s[0m 71s/step - accuracy: 0.5263 - loss: 7.5525 - val_accuracy: 0.5000 - val_loss: 7.9713
Epoch 5/10
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2739s[0m 70s/step - accuracy: 0.5301 - loss: 7.4920 - val_accuracy: 0.5000 - val_loss: 7.9713
Epoch 6/10
[1m40/40[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2781s[0m 71s/step - accuracy: 0.5707 - loss: 6.8441 - val_accuracy: 0.5000 - val_loss: 7.9713
Epoch 7/10
[1m40/40[0m [32m━━━