In [3]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
import cv2
from PIL import Image
import warnings
warnings.filterwarnings('ignore')

# Deep Learning imports
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Conv2D, MaxPooling2D, Flatten, Dropout, BatchNormalization, GlobalAveragePooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import VGG16, ResNet50, EfficientNetB0
from tensorflow.keras.utils import to_categorical

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

class FakeRealImageDetector:
    def __init__(self, dataset_path, test_path, train_csv_path, test_csv_path):
        self.dataset_path = dataset_path
        self.test_path = test_path
        self.train_csv_path = train_csv_path
        self.test_csv_path = test_csv_path
        self.img_size = (224, 224)
        self.batch_size = 32
        self.epochs = 50
        
        # Load CSV files
        self.train_df = pd.read_csv(train_csv_path)
        self.test_df = pd.read_csv(test_csv_path)
        
        print(f"Training samples: {len(self.train_df)}")
        print(f"Test samples: {len(self.test_df)}")
        print(f"Class distribution in training data:")
        print(self.train_df['label'].value_counts())
    
    def load_and_preprocess_data(self):
        """Load and preprocess training and test data"""
        print("Loading and preprocessing data...")
        
        # Load training data
        X_train = []
        y_train = []
        
        for idx, row in self.train_df.iterrows():
            filename = row['filename']
            label = row['label']
            
            # Determine the correct folder based on label
            if label == 'real':
                img_path = os.path.join(self.dataset_path, 'training_real', filename)
            else:
                img_path = os.path.join(self.dataset_path, 'training_fake', filename)
            
            if os.path.exists(img_path):
                # Load and preprocess image
                img = cv2.imread(img_path)
                if img is not None:
                    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                    img = cv2.resize(img, self.img_size)
                    img = img.astype('float32') / 255.0
                    
                    X_train.append(img)
                    y_train.append(1 if label == 'real' else 0)
        
        # Load test data
        X_test = []
        test_filenames = []
        
        for idx, row in self.test_df.iterrows():
            filename = row['filename']
            img_path = os.path.join(self.test_path, filename)
            
            if os.path.exists(img_path):
                # Load and preprocess image
                img = cv2.imread(img_path)
                if img is not None:
                    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                    img = cv2.resize(img, self.img_size)
                    img = img.astype('float32') / 255.0
                    
                    X_test.append(img)
                    test_filenames.append(filename)
        
        # Convert to numpy arrays
        X_train = np.array(X_train)
        y_train = np.array(y_train)
        X_test = np.array(X_test)
        
        # Split training data into train and validation
        X_train, X_val, y_train, y_val = train_test_split(
            X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
        )
        
        print(f"Training set shape: {X_train.shape}")
        print(f"Validation set shape: {X_val.shape}")
        print(f"Test set shape: {X_test.shape}")
        
        return X_train, X_val, X_test, y_train, y_val, test_filenames
    
    def create_custom_cnn_model(self):
        """Create a custom CNN model from scratch"""
        model = Sequential([
            # First Convolutional Block
            Conv2D(32, (3, 3), activation='relu', input_shape=(224, 224, 3)),
            BatchNormalization(),
            MaxPooling2D((2, 2)),
            
            # Second Convolutional Block
            Conv2D(64, (3, 3), activation='relu'),
            BatchNormalization(),
            MaxPooling2D((2, 2)),
            
            # Third Convolutional Block
            Conv2D(128, (3, 3), activation='relu'),
            BatchNormalization(),
            MaxPooling2D((2, 2)),
            
            # Fourth Convolutional Block
            Conv2D(256, (3, 3), activation='relu'),
            BatchNormalization(),
            MaxPooling2D((2, 2)),
            
            # Fifth Convolutional Block
            Conv2D(512, (3, 3), activation='relu'),
            BatchNormalization(),
            MaxPooling2D((2, 2)),
            
            # Flatten and Dense layers
            Flatten(),
            Dense(512, activation='relu'),
            Dropout(0.5),
            Dense(256, activation='relu'),
            Dropout(0.5),
            Dense(128, activation='relu'),
            Dropout(0.3),
            Dense(1, activation='sigmoid')
        ])
        
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def create_vgg16_model(self):
        """Create a model using pre-trained VGG16"""
        base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
        
        # Freeze base model layers
        base_model.trainable = False
        
        model = Sequential([
            base_model,
            GlobalAveragePooling2D(),
            Dense(512, activation='relu'),
            Dropout(0.5),
            Dense(256, activation='relu'),
            Dropout(0.3),
            Dense(1, activation='sigmoid')
        ])
        
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def create_resnet50_model(self):
        """Create a model using pre-trained ResNet50"""
        base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
        
        # Freeze base model layers
        base_model.trainable = False
        
        model = Sequential([
            base_model,
            GlobalAveragePooling2D(),
            Dense(512, activation='relu'),
            Dropout(0.5),
            Dense(256, activation='relu'),
            Dropout(0.3),
            Dense(1, activation='sigmoid')
        ])
        
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def create_efficientnet_model(self):
        """Create a model using pre-trained EfficientNetB0"""
        base_model = EfficientNetB0(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
        
        # Freeze base model layers
        base_model.trainable = False
        
        model = Sequential([
            base_model,
            GlobalAveragePooling2D(),
            Dense(512, activation='relu'),
            Dropout(0.5),
            Dense(256, activation='relu'),
            Dropout(0.3),
            Dense(1, activation='sigmoid')
        ])
        
        model.compile(
            optimizer=Adam(learning_rate=0.001),
            loss='binary_crossentropy',
            metrics=['accuracy']
        )
        
        return model
    
    def create_data_generators(self, X_train, y_train):
        """Create data generators for data augmentation"""
        datagen = ImageDataGenerator(
            rotation_range=20,
            width_shift_range=0.2,
            height_shift_range=0.2,
            horizontal_flip=True,
            zoom_range=0.2,
            shear_range=0.2,
            fill_mode='nearest'
        )
        
        return datagen
    
    def train_model(self, model, X_train, X_val, y_train, y_val, model_name):
        """Train a model with callbacks"""
        print(f"\nTraining {model_name} model...")
        
        # Callbacks
        early_stopping = EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True
        )
        
        reduce_lr = ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.2,
            patience=5,
            min_lr=0.0001
        )
        
        model_checkpoint = ModelCheckpoint(
            f'{model_name}_best_model.h5',
            monitor='val_accuracy',
            save_best_only=True,
            save_weights_only=False
        )
        
        # Data augmentation
        datagen = self.create_data_generators(X_train, y_train)
        
        # Train the model
        history = model.fit(
            datagen.flow(X_train, y_train, batch_size=self.batch_size),
            steps_per_epoch=len(X_train) // self.batch_size,
            epochs=self.epochs,
            validation_data=(X_val, y_val),
            callbacks=[early_stopping, reduce_lr, model_checkpoint],
            verbose=1
        )
        
        return history
    
    def plot_training_history(self, history, model_name):
        """Plot training history"""
        plt.figure(figsize=(12, 4))
        
        plt.subplot(1, 2, 1)
        plt.plot(history.history['accuracy'], label='Training Accuracy')
        plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
        plt.title(f'{model_name} - Model Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        
        plt.subplot(1, 2, 2)
        plt.plot(history.history['loss'], label='Training Loss')
        plt.plot(history.history['val_loss'], label='Validation Loss')
        plt.title(f'{model_name} - Model Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        
        plt.tight_layout()
        plt.show()
    
    def evaluate_model(self, model, X_val, y_val, model_name):
        """Evaluate model performance"""
        print(f"\nEvaluating {model_name} model...")
        
        # Predictions
        y_pred_proba = model.predict(X_val)
        y_pred = (y_pred_proba > 0.5).astype(int).flatten()
        
        # Calculate accuracy
        accuracy = accuracy_score(y_val, y_pred)
        print(f"{model_name} Validation Accuracy: {accuracy:.4f}")
        
        # Classification report
        print(f"\n{model_name} Classification Report:")
        print(classification_report(y_val, y_pred, target_names=['Fake', 'Real']))
        
        # Confusion matrix
        cm = confusion_matrix(y_val, y_pred)
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                   xticklabels=['Fake', 'Real'], yticklabels=['Fake', 'Real'])
        plt.title(f'{model_name} - Confusion Matrix')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.show()
        
        return accuracy
    
    def predict_test_data(self, model, X_test, test_filenames, model_name):
        """Make predictions on test data"""
        print(f"\nMaking predictions with {model_name} model...")
        
        # Predictions
        y_pred_proba = model.predict(X_test)
        y_pred = (y_pred_proba > 0.5).astype(int).flatten()
        
        # Create submission dataframe
        submission_df = pd.DataFrame({
            'filename': test_filenames,
            'prediction': ['real' if pred == 1 else 'fake' for pred in y_pred],
            'confidence': y_pred_proba.flatten()
        })
        
        # Save predictions
        submission_df.to_csv(f'{model_name}_predictions.csv', index=False)
        print(f"Predictions saved to {model_name}_predictions.csv")
        
        return submission_df
    
    def run_complete_pipeline(self):
        """Run the complete pipeline"""
        print("Starting Fake vs Real Image Detection Pipeline...")
        
        # Load and preprocess data
        X_train, X_val, X_test, y_train, y_val, test_filenames = self.load_and_preprocess_data()
        
        # Dictionary to store model performances
        model_performances = {}
        
        # 1. Custom CNN Model
        print("\n" + "="*50)
        print("TRAINING CUSTOM CNN MODEL")
        print("="*50)
        
        custom_cnn = self.create_custom_cnn_model()
        print(custom_cnn.summary())
        
        history_cnn = self.train_model(custom_cnn, X_train, X_val, y_train, y_val, "Custom_CNN")
        self.plot_training_history(history_cnn, "Custom CNN")
        
        accuracy_cnn = self.evaluate_model(custom_cnn, X_val, y_val, "Custom CNN")
        model_performances['Custom CNN'] = accuracy_cnn
        
        predictions_cnn = self.predict_test_data(custom_cnn, X_test, test_filenames, "Custom_CNN")
        
        # 2. VGG16 Model
        print("\n" + "="*50)
        print("TRAINING VGG16 MODEL")
        print("="*50)
        
        vgg16_model = self.create_vgg16_model()
        print(vgg16_model.summary())
        
        history_vgg16 = self.train_model(vgg16_model, X_train, X_val, y_train, y_val, "VGG16")
        self.plot_training_history(history_vgg16, "VGG16")
        
        accuracy_vgg16 = self.evaluate_model(vgg16_model, X_val, y_val, "VGG16")
        model_performances['VGG16'] = accuracy_vgg16
        
        predictions_vgg16 = self.predict_test_data(vgg16_model, X_test, test_filenames, "VGG16")
        
        # 3. ResNet50 Model
        print("\n" + "="*50)
        print("TRAINING RESNET50 MODEL")
        print("="*50)
        
        resnet50_model = self.create_resnet50_model()
        print(resnet50_model.summary())
        
        history_resnet50 = self.train_model(resnet50_model, X_train, X_val, y_train, y_val, "ResNet50")
        self.plot_training_history(history_resnet50, "ResNet50")
        
        accuracy_resnet50 = self.evaluate_model(resnet50_model, X_val, y_val, "ResNet50")
        model_performances['ResNet50'] = accuracy_resnet50
        
        predictions_resnet50 = self.predict_test_data(resnet50_model, X_test, test_filenames, "ResNet50")
        
        # 4. EfficientNetB0 Model
        print("\n" + "="*50)
        print("TRAINING EFFICIENTNETB0 MODEL")
        print("="*50)
        
        efficientnet_model = self.create_efficientnet_model()
        print(efficientnet_model.summary())
        
        history_efficientnet = self.train_model(efficientnet_model, X_train, X_val, y_train, y_val, "EfficientNetB0")
        self.plot_training_history(history_efficientnet, "EfficientNetB0")
        
        accuracy_efficientnet = self.evaluate_model(efficientnet_model, X_val, y_val, "EfficientNetB0")
        model_performances['EfficientNetB0'] = accuracy_efficientnet
        
        predictions_efficientnet = self.predict_test_data(efficientnet_model, X_test, test_filenames, "EfficientNetB0")
        
        # Model Comparison
        print("\n" + "="*50)
        print("MODEL COMPARISON")
        print("="*50)
        
        # Display model performances
        performance_df = pd.DataFrame(list(model_performances.items()), 
                                    columns=['Model', 'Validation Accuracy'])
        performance_df = performance_df.sort_values('Validation Accuracy', ascending=False)
        
        print("\nModel Performance Summary:")
        print(performance_df.to_string(index=False))
        
        # Plot model comparison
        plt.figure(figsize=(10, 6))
        plt.bar(performance_df['Model'], performance_df['Validation Accuracy'])
        plt.title('Model Performance Comparison')
        plt.xlabel('Model')
        plt.ylabel('Validation Accuracy')
        plt.xticks(rotation=45)
        plt.ylim(0, 1)
        
        # Add value labels on bars
        for i, v in enumerate(performance_df['Validation Accuracy']):
            plt.text(i, v + 0.01, f'{v:.4f}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.show()
        
        # Best model
        best_model_name = performance_df.iloc[0]['Model']
        best_accuracy = performance_df.iloc[0]['Validation Accuracy']
        
        print(f"\nBest performing model: {best_model_name}")
        print(f"Best validation accuracy: {best_accuracy:.4f}")
        
        return model_performances

# Usage
if __name__ == "__main__":
    # Define paths (modify these according to your folder structure)
    dataset_path = "dataset"  # Contains training_real and training_fake folders
    test_path = "test"        # Contains test images
    train_csv_path = "train.csv"
    test_csv_path = "test.csv"
    
    # Initialize the detector
    detector = FakeRealImageDetector(dataset_path, test_path, train_csv_path, test_csv_path)
    
    # Run the complete pipeline
    model_performances = detector.run_complete_pipeline()
    
    print("\nPipeline completed successfully!")
    print("Check the generated CSV files for predictions from each model.")

Training samples: 1709
Test samples: 332
Class distribution in training data:
label
1    949
0    760
Name: count, dtype: int64
Starting Fake vs Real Image Detection Pipeline...
Loading and preprocessing data...


KeyError: 'filename'

In [2]:
!pip install opencv-python tensorflow scikit-learn matplotlib seaborn pillow pandas numpy

Defaulting to user installation because normal site-packages is not writeable
Collecting opencv-python
  Downloading opencv_python-4.12.0.88-cp37-abi3-win_amd64.whl.metadata (19 kB)
Collecting numpy
  Using cached numpy-2.2.6-cp312-cp312-win_amd64.whl.metadata (60 kB)
  Downloading numpy-2.1.3-cp312-cp312-win_amd64.whl.metadata (60 kB)
INFO: pip is looking at multiple versions of contourpy to determine which version is compatible with other requirements. This could take a while.
Collecting contourpy>=1.0.1 (from matplotlib)
  Downloading contourpy-1.3.2-cp312-cp312-win_amd64.whl.metadata (5.5 kB)
Downloading opencv_python-4.12.0.88-cp37-abi3-win_amd64.whl (39.0 MB)
   ---------------------------------------- 0.0/39.0 MB ? eta -:--:--
   - -------------------------------------- 1.3/39.0 MB 7.4 MB/s eta 0:00:06
   --- ------------------------------------ 3.7/39.0 MB 9.1 MB/s eta 0:00:04
   ----- ---------------------------------- 5.2/39.0 MB 9.1 MB/s eta 0:00:04
   ------- --------------

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
gensim 4.3.3 requires numpy<2.0,>=1.18.5, but you have numpy 2.1.3 which is incompatible.
numba 0.60.0 requires numpy<2.1,>=1.22, but you have numpy 2.1.3 which is incompatible.
