In [None]:
# Railway Track Damage Classification Based on Image Analysis
#
# Author: Bruno Soares dos Santos
# CNN-Based Classification of Railway Track Defects
#
# This script implements a binary classification model using a CNN with a ResNet50 backbone
# for detecting four types of surface defects in railway tracks: Squats, Flakings, Spallings, and Shellings.
# Each class is trained using a one-vs-rest approach. The pipeline includes:
# - Image preprocessing (resizing, median filter, Canny edge detection)
# - Model training with ResNet50
# - 5-fold cross-validation
# - Evaluation through ROC curves and confusion matrices
#
# Dependencies: tensorflow, opencv-python, numpy, scikit-learn

import os
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import accuracy_score, recall_score, confusion_matrix, roc_curve, auc

# -----------------------------------
# Configuration
# -----------------------------------
BASE_DIR = '/content/drive/Othercomputers/Laptop I7/doutorado/Dataset-ferrovia/DATASET_ARAIN_4'
SAVE_PATH = '/content/drive/Othercomputers/Laptop I7/doutorado/Dados/CNN'
CLASSES = ['Squats', 'Flakings', 'Spallings', 'Shellings']
IMG_SIZE = (150, 150)

# Ensure save directory exists
os.makedirs(SAVE_PATH, exist_ok=True)

# -----------------------------------
# Data Loading and Preprocessing
# -----------------------------------
def load_images_from_folder(folder, label):
    # Load and preprocess images from a specified folder
    # Args:
    #   folder (str): Path to the image folder
    #   label (int): Class label for the images
    # Returns:
    #   tuple: Lists of preprocessed images and corresponding labels
    images, labels = [], []
    for filename in os.listdir(folder):
        img_path = os.path.join(folder, filename)
        img = cv2.imread(img_path)
        if img is not None:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert to RGB
            img = cv2.resize(img, IMG_SIZE)  # Resize to 150x150
            img = cv2.medianBlur(img, 5)  # Apply median filter
            img = cv2.Canny(img, 100, 200)  # Apply Canny edge detection
            images.append(img)
            labels.append(label)
    return images, labels

def create_binary_dataset(target_class, images, labels):
    # Create a balanced binary dataset for a specific class
    # Args:
    #   target_class (int): Target class index
    #   images (np.array): Array of images
    #   labels (np.array): Array of labels
    # Returns:
    #   tuple: Binary dataset images and labels
    target_indices = np.where(labels == target_class)[0]
    non_target_indices = np.where(labels != target_class)[0]
    non_target_sample = np.random.choice(non_target_indices, size=len(target_indices), replace=False)
    binary_indices = np.concatenate([target_indices, non_target_sample])
    binary_labels = np.concatenate([np.ones(len(target_indices)), np.zeros(len(target_indices))])
    return images[binary_indices], binary_labels

# -----------------------------------
# Model Creation
# -----------------------------------
def create_model():
    # Create a CNN model using ResNet50 backbone
    # Returns:
    #   tf.keras.Model: Compiled model for binary classification
    base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False, input_shape=(150, 150, 3))
    base_model.trainable = False  # Freeze ResNet50 weights
    model = tf.keras.models.Sequential([
        base_model,
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(512, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')  # Binary classification
    ])
    model.compile(optimizer='adam',
                  loss='binary_crossentropy',
                  metrics=['accuracy', tf.keras.metrics.Recall(name='recall')])
    return model

# -----------------------------------
# Evaluation Metrics
# -----------------------------------
def calculate_specificity(y_true, y_pred):
    # Calculate specificity from true and predicted labels
    # Args:
    #   y_true (np.array): True labels
    #   y_pred (np.array): Predicted labels
    # Returns:
    #   float: Specificity score
    tn, fp, _, _ = confusion_matrix(y_true, y_pred).ravel()
    return tn / (tn + fp) if (tn + fp) > 0 else 0.0

def save_coords_to_txt(coords, filename):
    # Save coordinates to a text file
    # Args:
    #   coords (list): List of (x, y) coordinates
    #   filename (str): Output file path
    with open(filename, 'w') as f:
        for x, y in coords:
            f.write(f"({x:.4f},{y:.4f})\n")

# -----------------------------------
# Model Training and Evaluation
# -----------------------------------
def train_and_evaluate_model(data, class_name):
    # Train and evaluate the model for a specific class
    # Args:
    #   data (dict): Dictionary containing train, validation, and test datasets
    #   class_name (str): Name of the class being evaluated
    # Returns:
    #   dict: Training history
    X_train, y_train = data['X_train'], data['y_train']
    X_val, y_val = data['X_val'], data['y_val']

    # Convert grayscale images to RGB and normalize
    X_train_rgb = np.stack([cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) for img in X_train]) / 255.0
    X_val_rgb = np.stack([cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) for img in X_val]) / 255.0

    model = create_model()
    history = model.fit(X_train_rgb, y_train, validation_data=(X_val_rgb, y_val),
                        epochs=20, verbose=1)

    # Calculate specificity for each epoch
    specificity_train, specificity_val = [], []
    for epoch in range(20):
        y_train_pred = (model.predict(X_train_rgb).flatten() >= 0.5).astype(int)
        y_val_pred = (model.predict(X_val_rgb).flatten() >= 0.5).astype(int)
        tn_train = np.sum((y_train == 0) & (y_train_pred == 0))
        fp_train = np.sum((y_train == 0) & (y_train_pred == 1))
        tn_val = np.sum((y_val == 0) & (y_val_pred == 0))
        fp_val = np.sum((y_val == 0) & (y_val_pred == 1))
        specificity_train.append(tn_train / (tn_train + fp_train) if (tn_train + fp_train) > 0 else 0.0)
        specificity_val.append(tn_val / (tn_val + fp_val) if (tn_val + fp_val) > 0 else 0.0)

    # Save specificity curves
    epochs = range(1, 21)
    save_coords_to_txt(list(zip(epochs, specificity_train)),
                       os.path.join(SAVE_PATH, f'learning_specificity_{class_name}.txt'))
    save_coords_to_txt(list(zip(epochs, specificity_val)),
                       os.path.join(SAVE_PATH, f'learning_val_specificity_{class_name}.txt'))

    # Print learning curves
    print(f"\nLearning Curves for {class_name}:")
    print(f"Epochs: {list(epochs)}")
    print(f"Training Accuracy: {history.history['accuracy']}")
    print(f"Validation Accuracy: {history.history['val_accuracy']}")
    print(f"Training Loss: {history.history['loss']}")
    print(f"Validation Loss: {history.history['val_loss']}")
    print(f"Training Sensitivity: {history.history['recall']}")
    print(f"Validation Sensitivity: {history.history['val_recall']}")
    print(f"Training Specificity: {specificity_train}")
    print(f"Validation Specificity: {specificity_val}")

    return history

# -----------------------------------
# Cross-Validation
# -----------------------------------
def cross_validation_metrics(X, y, n_splits=5):
    # Perform k-fold cross-validation and compute metrics
    # Args:
    #   X (np.array): Input images
    #   y (np.array): Labels
    #   n_splits (int): Number of folds for cross-validation
    # Returns:
    #   dict: Mean and standard deviation of accuracy, sensitivity, and specificity
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    accuracies, sensitivities, specificities = [], [], []

    for train_index, val_index in kf.split(X):
        X_train, X_val = X[train_index], X[val_index]
        y_train, y_val = y[train_index], y[val_index]

        # Convert grayscale images to RGB and normalize
        X_train_rgb = np.stack([cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) for img in X_train]) / 255.0
        X_val_rgb = np.stack([cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) for img in X_val]) / 255.0

        model = create_model()
        model.fit(X_train_rgb, y_train, epochs=20, verbose=0)

        y_pred = (model.predict(X_val_rgb).flatten() >= 0.5).astype(int)
        accuracies.append(accuracy_score(y_val, y_pred))
        sensitivities.append(recall_score(y_val, y_pred))
        specificities.append(calculate_specificity(y_val, y_pred))

    return {
        'Accuracy': (np.mean(accuracies), np.std(accuracies)),
        'Sensitivity': (np.mean(sensitivities), np.std(sensitivities)),
        'Specificity': (np.mean(specificities), np.std(specificities))
    }

# -----------------------------------
# ROC Curve and Confusion Matrix
# -----------------------------------
def generate_and_save_roc_curve(X_data_rgb, y_data, model, class_name, dataset_name):
    # Generate and save ROC curve data
    # Args:
    #   X_data_rgb (np.array): Preprocessed RGB images
    #   y_data (np.array): True labels
    #   model (tf.keras.Model): Trained model
    #   class_name (str): Name of the class
    #   dataset_name (str): Dataset type (Test/Validation)
    # Returns:
    #   float: AUC score
    y_pred_prob = model.predict(X_data_rgb).flatten()
    fpr, tpr, _ = roc_curve(y_data, y_pred_prob)
    auc_score = auc(fpr, tpr)

    with open(os.path.join(SAVE_PATH, f'roc_curve_{class_name}_{dataset_name}.txt'), 'w') as f:
        for x, y in zip(fpr, tpr):
            f.write(f"({x:.4f},{y:.4f})\n")

    print(f"{class_name} ROC AUC for {dataset_name} = {auc_score:.4f}")
    return auc_score

def save_confusion_matrix_and_percentages(y_true, y_pred, class_name, dataset_name):
    # Save confusion matrix and its percentages
    # Args:
    #   y_true (np.array): True labels
    #   y_pred (np.array): Predicted labels
    #   class_name (str): Name of the class
    #   dataset_name (str): Dataset type (Test/Validation)
    conf_matrix = confusion_matrix(y_true, y_pred)
    TN, FP, FN, TP = conf_matrix.ravel()
    total = TN + FP + FN + TP
    conf_matrix_percentage = conf_matrix / total * 100 if total > 0 else conf_matrix

    with open(os.path.join(SAVE_PATH, f'confusion_matrix_{class_name}_{dataset_name}.txt'), 'w') as f:
        f.write(f"Confusion Matrix (Numerical) for {dataset_name}:\n")
        f.write(f"TN: {TN}, FP: {FP}, FN: {FN}, TP: {TP}\n\n")
        f.write(f"Confusion Matrix (Percentage) for {dataset_name}:\n")
        f.write(f"TN: {conf_matrix_percentage[0,0]:.2f}%, FP: {conf_matrix_percentage[0,1]:.2f}%\n")
        f.write(f"FN: {conf_matrix_percentage[1,0]:.2f}%, TP: {conf_matrix_percentage[1,1]:.2f}%\n")

    print(f"{class_name} Confusion Matrix for {dataset_name} saved.")

# -----------------------------------
# Main Execution
# -----------------------------------
def main():
    # Main function to execute the pipeline: data loading, model training, cross-validation, and evaluation

    # Load and preprocess images
    all_images, all_labels = [], []
    paths = {cls: os.path.join(BASE_DIR, cls) for cls in CLASSES}
    for label, path in enumerate(paths.values()):
        images, labels = load_images_from_folder(path, label)
        all_images.extend(images)
        all_labels.extend(labels)

    images = np.array(all_images)
    labels = np.array(all_labels)

    # Create binary datasets
    binary_datasets = {}
    for class_idx, class_name in enumerate(CLASSES):
        X_bin, y_bin = create_binary_dataset(class_idx, images, labels)
        X_train, X_temp, y_train, y_temp = train_test_split(X_bin, y_bin, test_size=0.3,
                                                             random_state=42, stratify=y_bin)
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.33,
                                                        random_state=42, stratify=y_temp)
        binary_datasets[class_name] = {
            'X_train': X_train, 'y_train': y_train,
            'X_val': X_val, 'y_val': y_val,
            'X_test': X_test, 'y_test': y_test
        }

    # Train and evaluate models
    for class_name, data in binary_datasets.items():
        print(f"\nTraining model for class {class_name}...")
        train_and_evaluate_model(data, class_name)

        print(f"\nCross-validation for class {class_name}...")
        metrics = cross_validation_metrics(data['X_train'], data['y_train'])
        print(f"Accuracy: Mean = {metrics['Accuracy'][0]:.4f}, Std = {metrics['Accuracy'][1]:.4f}")
        print(f"Sensitivity: Mean = {metrics['Sensitivity'][0]:.4f}, Std = {metrics['Sensitivity'][1]:.4f}")
        print(f"Specificity: Mean = {metrics['Specificity'][0]:.4f}, Std = {metrics['Specificity'][1]:.4f}")

        print(f"\nProcessing ROC and Confusion Matrix for class {class_name}...")
        X_train_rgb = np.stack([cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) for img in data['X_train']]) / 255.0
        X_test_rgb = np.stack([cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) for img in data['X_test']]) / 255.0
        X_val_rgb = np.stack([cv2.cvtColor(img, cv2.COLOR_GRAY2RGB) for img in data['X_val']]) / 255.0

        model = create_model()
        model.fit(X_train_rgb, data['y_train'], epochs=20, verbose=0)

        generate_and_save_roc_curve(X_test_rgb, data['y_test'], model, class_name, "Test")
        generate_and_save_roc_curve(X_val_rgb, data['y_val'], model, class_name, "Validation")

        y_pred_test = (model.predict(X_test_rgb).flatten() >= 0.5).astype(int)
        y_pred_val = (model.predict(X_val_rgb).flatten() >= 0.5).astype(int)

        save_confusion_matrix_and_percentages(data['y_test'], y_pred_test, class_name, "Test")
        save_confusion_matrix_and_percentages(data['y_val'], y_pred_val, class_name, "Validation")

if __name__ == "__main__":
    main()