In [4]:
import numpy as np
import os
import cv2
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, UpSampling2D, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import matplotlib.pyplot as plt

# Function to load images in batches from the UCSD Pedestrian dataset
def load_images_in_batches(path, label, batch_size=100):
    images = []  # To store all the loaded images
    count = 0    # Counter for batches

    folder_path = path
    print(f"Processing folder: {folder_path}")

    if os.path.isdir(folder_path):
        # Loop through the files (images)
        for dirpath, _, filenames in os.walk(folder_path):
            for file in filenames:
                if file.endswith(('.jpg', '.png', '.tif', '.bmp')):  # Ensure it's an image file
                    image_path = os.path.join(dirpath, file)

                    try:
                        # Read and process the image
                        image = cv2.imread(image_path)
                        if image is None:
                            print(f"Failed to load image {image_path}. Skipping...")
                            continue

                        # Resize the image to match the input size required by the model
                        image = cv2.resize(image, (224, 224))  # Model input size

                        # Normalize the image (Convert pixel values from 0-255 to 0-1)
                        image = image.astype('float32') / 255.0

                        # Append the processed image to the list
                        images.append(image)

                        count += 1
                        if count % batch_size == 0:  # If batch_size images are loaded, return batch
                            yield np.array(images), np.array(images)  # Use images as both input and target
                            images = []  # Reset the list for the next batch

                    except Exception as e:
                        print(f"Error processing image {image_path}: {e}")
                        continue

    # Return any remaining images in the last batch
    if len(images) > 0:
        yield np.array(images), np.array(images)  # Use images as both input and target

# Autoencoder Model
def create_autoencoder(input_shape):
    # Encoder
    input_img = Input(shape=input_shape)
    x = Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D((2, 2), padding='same')(x)
    
    # Bottleneck
    x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    
    # Decoder
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

    autoencoder = Model(input_img, decoded)
    return autoencoder

# Compile the model
def compile_autoencoder(autoencoder):
    autoencoder.compile(optimizer=Adam(learning_rate=0.0001), 
                        loss='mean_squared_error',
                        metrics=['accuracy'])

# Train the autoencoder model using batch generator
def train_autoencoder(autoencoder, train_path, batch_size=100, steps_per_epoch=50, epochs=10):
    early_stopping = EarlyStopping(monitor='loss', patience=5, restore_best_weights=True)

    # Generator for loading images in batches
    train_generator = load_images_in_batches(train_path, label=0, batch_size=batch_size)
    
    # Train using the generator
    history = autoencoder.fit(train_generator,
                              steps_per_epoch=steps_per_epoch,  # Adjust based on available data
                              epochs=epochs,
                              callbacks=[early_stopping])
    return history

# Test the model and calculate performance metrics
def test_autoencoder(autoencoder, test_path, threshold=0.02, batch_size=100):
    test_generator = load_images_in_batches(test_path, label=1, batch_size=batch_size)
    
    all_errors, all_anomalies, all_true_labels = [], [], []
    
    for test_images, _ in test_generator:
        print(f"Testing on data with shape: {test_images.shape}")
        
        reconstructions = autoencoder.predict(test_images)
        
        # Compute reconstruction error
        errors = np.mean(np.abs(reconstructions - test_images), axis=(1, 2, 3))
        anomalies = errors > threshold
        
        all_errors.extend(errors)
        all_anomalies.extend(anomalies)
    
    # For testing we assume that test images are anomalies (label=1)
    true_labels = np.ones(len(all_anomalies))

    # Calculate metrics
    accuracy = accuracy_score(true_labels, all_anomalies)
    precision = precision_score(true_labels, all_anomalies)
    recall = recall_score(true_labels, all_anomalies)
    f1 = f1_score(true_labels, all_anomalies)
    
    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
    
    return all_errors, all_anomalies, accuracy, precision, recall, f1

# Main function to execute the workflow for UCSD dataset
def main():
    train_path = r'C:\Users\abhishik chebrolu\Downloads\AINN pro\UCSD_Anomaly_Dataset\UCSD_Anomaly_Dataset.v1p2\UCSDped1\Train'  # Update with the actual training dataset path
    test_path = r'C:\Users\abhishik chebrolu\Downloads\AINN pro\UCSD_Anomaly_Dataset\UCSD_Anomaly_Dataset.v1p2\UCSDped1\Test'  # Update with the actual testing dataset path
    
    # Create and compile the autoencoder model
    input_shape = (224, 224, 3)
    autoencoder = create_autoencoder(input_shape)
    compile_autoencoder(autoencoder)
    
    # Train the autoencoder
    print("Training the autoencoder...")
    history = train_autoencoder(autoencoder, train_path, batch_size=100, steps_per_epoch=10, epochs=20)  # Adjust steps_per_epoch to match dataset size
    
    # Plot training history
    plt.plot(history.history['loss'], label='loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend(loc='upper right')
    plt.show()
    
    # Test the autoencoder and calculate accuracy, precision, recall, F1
    print("Testing the autoencoder...")
    errors, anomalies, accuracy, precision, recall, f1 = test_autoencoder(autoencoder, test_path)
    
if __name__ == "__main__":
    main()


Training the autoencoder...
Processing folder: C:\Users\abhishik chebrolu\Downloads\AINN pro\UCSD_Anomaly_Dataset\UCSD_Anomaly_Dataset.v1p2\UCSDped1\Train
Epoch 1/20
[1m 1/10[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m41s[0m 5s/step - accuracy: 0.0133 - loss: 0.0554

KeyboardInterrupt: 