In [None]:
"""
Aircraft Rudder Thermal Imaging Water Ingression Detection - Visual-only CNN
==========================================================================
Automated water ingression detection in aircraft rudder hoisting points using computer vision only.
Target: 84.0% CV accuracy and 93.9% validation test water detection with ResNet50 thermal adaptation.
Dataset: 82 thermal images from 15 inspections over 2+ years (60% water vs 40% NWI)
Method: ResNet50 with thermal-specific augmentation pipeline (no metadata integration)
Key Features: Custom thermal blur/noise/reflection augmentation, palette standardization (89% quality)
Performance: Robust blind test results proving visual features sufficient for thermal signatures
"""

In [None]:
## Libraries and Setup
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import pointbiserialr, chi2_contingency
import warnings
warnings.filterwarnings('ignore')
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms, models
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
import os
from PIL import Image, ImageFilter, ImageEnhance
import random
from tqdm import tqdm

# Random seeds for reproducibility
def set_random_seeds(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seeds(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
## Configuration
CSV_PATH = r"[METADATA CSV HERE]" # Folder path here
WATER_PATH = r"[WATER INGRESSION FOLDER PATH HERE]" # Folder path here
NWI_PATH = r"[NILL WATER INGRESSION FOLDER PATH HERE]" # Folder path here

# Read CSV file (for reference only)
df = pd.read_csv(CSV_PATH)
print(f"\nData Shape: {df.shape}")
print(f"Water detection rate: {df['Has Water'].mean():.1%}")

In [None]:
## Image Augmentation Classes
class ThermalBlur:
    def __init__(self, blur_limit=(1, 3), p=0.5):
        self.blur_limit = blur_limit
        self.p = p
    
    def __call__(self, image):
        if random.random() < self.p:
            blur_radius = random.uniform(*self.blur_limit)
            return image.filter(ImageFilter.GaussianBlur(radius=blur_radius))
        return image

class ThermalNoise:
    def __init__(self, noise_factor=0.1, p=0.3):
        self.noise_factor = noise_factor
        self.p = p
    
    def __call__(self, image):
        if random.random() < self.p:
            img_array = np.array(image, dtype=np.float32)
            noise = np.random.normal(0, self.noise_factor * 255, img_array.shape)
            noisy_img = np.clip(img_array + noise, 0, 255).astype(np.uint8)
            return Image.fromarray(noisy_img)
        return image

class LightReflection:
    def __init__(self, intensity_range=(0.8, 1.3), p=0.3):
        self.intensity_range = intensity_range
        self.p = p
    
    def __call__(self, image):
        if random.random() < self.p:
            enhancer = ImageEnhance.Brightness(image)
            factor = random.uniform(*self.intensity_range)
            return enhancer.enhance(factor)
        return image

# Transform Strategy (Enhanced for thermal images)
def get_thermal_transforms(mode='train'):
    if mode == 'train':
        return transforms.Compose([
            transforms.Resize((224, 224)),
            ThermalBlur(blur_limit=(0.5, 2.0), p=0.4),
            LightReflection(intensity_range=(0.8, 1.2), p=0.3),
            ThermalNoise(noise_factor=0.05, p=0.2),
            transforms.RandomRotation(15),
            transforms.RandomHorizontalFlip(0.5),
            transforms.ColorJitter(brightness=0.2, contrast=0.2),
            transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5], std=[0.5])
        ])
    else:
        return transforms.Compose([
            transforms.Resize((224, 224)), 
            transforms.ToTensor(), 
            transforms.Normalize(mean=[0.5], std=[0.5])
        ])

In [None]:
## Dataset and Model Classes
class ThermalVisualDataset(Dataset):
    def __init__(self, water_path, nwi_path, transform=None):
        self.transform = transform
        self.samples = []
        
        # Load water images
        if os.path.exists(water_path):
            for filename in os.listdir(water_path):
                if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
                    self.samples.append({'path': os.path.join(water_path, filename), 'label': 1, 'file_id': os.path.splitext(filename)[0]})
        
        # Load NWI images
        if os.path.exists(nwi_path):
            for filename in os.listdir(nwi_path):
                if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
                    self.samples.append({'path': os.path.join(nwi_path, filename), 'label': 0, 'file_id': os.path.splitext(filename)[0]})
        
        labels = [s['label'] for s in self.samples]
        unique, counts = np.unique(labels, return_counts=True)
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample = self.samples[idx]
        image = Image.open(sample['path']).convert('L')
        
        if self.transform:
            image = self.transform(image)
        
        return image, sample['label']

# CNN Model
class ThermalVisualCNN(nn.Module):
    def __init__(self, num_classes=2, backbone='resnet50'):
        super(ThermalVisualCNN, self).__init__()
        
        # Image branch only
        if backbone == 'resnet50':
            self.backbone = models.resnet50(weights='IMAGENET1K_V2')
            self.backbone.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
            
            # Replace final classification layer
            num_features = self.backbone.fc.in_features
            self.backbone.fc = nn.Sequential(
                nn.Dropout(0.5),
                nn.Linear(num_features, 512),
                nn.ReLU(),
                nn.Dropout(0.3),
                nn.Linear(512, 128),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(128, num_classes)
            )
        
        self._initialize_weights()
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
    
    def forward(self, image):
        output = self.backbone(image)
        return output

In [None]:
## Evaluation Functions
def evaluate_model_comprehensive(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    all_predictions = []
    all_labels = []
    all_probs = []
    
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            
            probs = F.softmax(outputs, dim=1)
            _, predicted = torch.max(outputs, 1)
            
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs[:, 1].cpu().numpy())
    
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions, zero_division=0)
    recall = recall_score(all_labels, all_predictions, zero_division=0)
    f1 = f1_score(all_labels, all_predictions, zero_division=0)
    auc = roc_auc_score(all_labels, all_probs) if len(set(all_labels)) > 1 else 0.0
    
    return {
        'loss': total_loss / len(dataloader),
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'auc': auc,
        'predictions': all_predictions,
        'labels': all_labels,
        'probabilities': all_probs
    }

def plot_training_history(history, fold_num):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    epochs = range(1, len(history['train_losses']) + 1)
    
    # Model Loss
    ax1.plot(epochs, history['train_losses'], 'b-', label='Training Loss', linewidth=2)
    ax1.plot(epochs, history['val_losses'], 'r-', label='Validation Loss', linewidth=2)
    ax1.set_title('Model Loss - Visual Only', fontsize=14, fontweight='bold')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Model Accuracy
    ax2.plot(epochs, history['train_accuracies'], 'b-', label='Training Accuracy', linewidth=2)
    ax2.plot(epochs, history['val_accuracies'], 'r-', label='Validation Accuracy', linewidth=2)
    ax2.set_title('Model Accuracy - Visual Only', fontsize=14, fontweight='bold')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

def plot_confusion_matrix_detailed(y_true, y_pred, title="Confusion Matrix - Visual Only"):
    cm = confusion_matrix(y_true, y_pred)
    cm_percent = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100
    
    plt.figure(figsize=(8, 6))
    
    annotations = []
    for i in range(cm.shape[0]):
        row = []
        for j in range(cm.shape[1]):
            row.append(f'{cm[i,j]}\n({cm_percent[i,j]:.1f}%)')
        annotations.append(row)
    
    sns.heatmap(cm, annot=annotations, fmt='', cmap='Blues', xticklabels=['NWI', 'Water'], yticklabels=['NWI', 'Water'], cbar_kws={'label': 'Count'})
    
    plt.title(title, fontsize=14, fontweight='bold')
    plt.xlabel('Predicted', fontsize=12, fontweight='bold')
    plt.ylabel('Actual', fontsize=12, fontweight='bold')
    plt.tight_layout()
    plt.show()

def analyze_confidence_levels(probabilities, labels, predictions):
    probabilities = np.array(probabilities)
    labels = np.array(labels)
    predictions = np.array(predictions)
    
    high_conf_mask = probabilities >= 0.8
    medium_conf_mask = (probabilities >= 0.6) & (probabilities < 0.8)
    low_conf_mask = probabilities < 0.6
    
    results = {}
    for mask, level in [(high_conf_mask, 'High (≥0.8)'), (medium_conf_mask, 'Medium (0.6-0.8)'), (low_conf_mask, 'Low (<0.6)')]:
        if np.sum(mask) > 0:
            conf_labels = labels[mask]
            conf_preds = predictions[mask]
            accuracy = accuracy_score(conf_labels, conf_preds) if len(conf_labels) > 0 else 0
            
            results[level] = {'count': np.sum(mask), 'accuracy': accuracy, 'percentage': np.sum(mask) / len(probabilities) * 100}
        else:
            results[level] = {'count': 0, 'accuracy': 0, 'percentage': 0}
    
    return results

def plot_confidence_analysis(confidence_results):
    levels = list(confidence_results.keys())
    counts = [confidence_results[level]['count'] for level in levels]
    accuracies = [confidence_results[level]['accuracy'] for level in levels]
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    colors = ['darkred', 'orange', 'lightcoral']
    ax1.bar(levels, counts, color=colors, alpha=0.8)
    ax1.set_title('Prediction Count by Confidence Level', fontsize=14, fontweight='bold')
    ax1.set_ylabel('Count')
    ax1.grid(True, alpha=0.3)
    
    ax2.bar(levels, accuracies, color=colors, alpha=0.8)
    ax2.set_title('Accuracy by Confidence Level', fontsize=14, fontweight='bold')
    ax2.set_ylabel('Accuracy')
    ax2.set_ylim(0, 1)
    ax2.grid(True, alpha=0.3)
    
    for i, (count, acc) in enumerate(zip(counts, accuracies)):
        ax1.text(i, count + 0.1, str(count), ha='center', fontweight='bold')
        ax2.text(i, acc + 0.02, f'{acc:.3f}', ha='center', fontweight='bold')
    
    plt.tight_layout()
    plt.show()

In [None]:
## Training Function
def train_model_with_history(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, device):
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []
    best_val_acc = 0.0
    best_model_state = None
    patience_counter = 0
    
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        train_predictions = []
        train_labels = []
        
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            total_train_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            train_predictions.extend(predicted.cpu().numpy())
            train_labels.extend(labels.cpu().numpy())
        
        train_acc = accuracy_score(train_labels, train_predictions)
        avg_train_loss = total_train_loss / len(train_loader)
        
        val_metrics = evaluate_model_comprehensive(model, val_loader, criterion, device)
        scheduler.step(val_metrics['loss'])
        
        train_losses.append(avg_train_loss)
        val_losses.append(val_metrics['loss'])
        train_accuracies.append(train_acc)
        val_accuracies.append(val_metrics['accuracy'])
        
        if val_metrics['accuracy'] > best_val_acc:
            best_val_acc = val_metrics['accuracy']
            best_model_state = model.state_dict().copy()
            patience_counter = 0
        else:
            patience_counter += 1
        
        if patience_counter >= 8:
            break
    
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    
    return {
        'train_losses': train_losses, 
        'val_losses': val_losses, 
        'train_accuracies': train_accuracies, 
        'val_accuracies': val_accuracies, 
        'best_val_accuracy': best_val_acc
    }

In [None]:
## K-Fold Cross Validation
def run_kfold_validation_visual_only(water_path, nwi_path, k_folds=5):
    all_samples = []
    
    for filename in os.listdir(water_path):
        if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
            all_samples.append({'path': os.path.join(water_path, filename), 'label': 1, 'file_id': os.path.splitext(filename)[0]})
    
    for filename in os.listdir(nwi_path):
        if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
            all_samples.append({'path': os.path.join(nwi_path, filename), 'label': 0, 'file_id': os.path.splitext(filename)[0]})
    
    labels = [sample['label'] for sample in all_samples]
    skf = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)
    fold_results = []
    best_model = None
    best_accuracy = 0
    
    for fold, (train_idx, val_idx) in enumerate(skf.split(all_samples, labels)):
        print(f"\nFOLD {fold+1}/{k_folds}")
        
        train_samples = [all_samples[i] for i in train_idx]
        val_samples = [all_samples[i] for i in val_idx]
        
        train_dataset = ThermalVisualDataset(water_path, nwi_path, transform=get_thermal_transforms('train'))
        train_dataset.samples = train_samples
        
        val_dataset = ThermalVisualDataset(water_path, nwi_path, transform=get_thermal_transforms('val'))
        val_dataset.samples = val_samples
        
        train_labels_fold = [sample['label'] for sample in train_samples]
        class_counts = np.bincount(train_labels_fold)
        class_weights = 1.0 / class_counts
        sample_weights = [class_weights[label] for label in train_labels_fold]
        
        sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)
        
        train_loader = DataLoader(train_dataset, batch_size=8, sampler=sampler, drop_last=True)
        val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)
        
        model = ThermalVisualCNN(num_classes=2, backbone='resnet50').to(device)
        
        class_weights_tensor = torch.FloatTensor([1.0, class_counts[0]/class_counts[1]]).to(device)
        criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)
        optimizer = optim.AdamW(model.parameters(), lr=0.0005, weight_decay=0.01)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)
        
        history = train_model_with_history(model, train_loader, val_loader, criterion, optimizer, scheduler, 30, device)
        final_metrics = evaluate_model_comprehensive(model, val_loader, criterion, device)
        
        if fold == 0:
            plot_training_history(history, fold+1)
            plot_confusion_matrix_detailed(final_metrics['labels'], final_metrics['predictions'], f"Confusion Matrix - Visual Only Fold {fold+1}")
            
            confidence_results = analyze_confidence_levels(final_metrics['probabilities'], final_metrics['labels'], final_metrics['predictions'])
            plot_confidence_analysis(confidence_results)
            
            print(f"\nFold 1 Confidence Analysis:")
            for level, results in confidence_results.items():
                print(f"  {level}: {results['count']} predictions ({results['percentage']:.1f}%) - {results['accuracy']:.1%} accuracy")
        
        if final_metrics['accuracy'] > best_accuracy:
            best_accuracy = final_metrics['accuracy']
            best_model = model
        
        fold_results.append({
            'fold': fold + 1,
            'train_samples': len(train_samples),
            'val_samples': len(val_samples),
            'best_val_accuracy': history['best_val_accuracy'],
            'final_accuracy': final_metrics['accuracy'],
            'final_precision': final_metrics['precision'],
            'final_recall': final_metrics['recall'],
            'final_f1': final_metrics['f1'],
            'final_auc': final_metrics['auc']
        })
        
        print(f"\nFold {fold+1} Results:")
        print(f"  Best Val Accuracy: {history['best_val_accuracy']:.4f}")
        print(f"  Final Accuracy: {final_metrics['accuracy']:.4f}")
        print(f"  Final F1: {final_metrics['f1']:.4f}")
        print(f"  Final AUC: {final_metrics['auc']:.4f}")
    
    return fold_results, best_model

In [None]:
## Run K-Fold Validation
fold_results, best_model = run_kfold_validation_visual_only(WATER_PATH, NWI_PATH, k_folds=5)

In [None]:
## Results Analysis and Visualization
print("K-FOLD SUMMARY")

results_df = pd.DataFrame(fold_results)
print(results_df.to_string(index=False))

print(f"\nCross-Validation Summary:")
print(f"  Mean Accuracy: {results_df['final_accuracy'].mean():.4f} ± {results_df['final_accuracy'].std():.4f}")
print(f"  Mean F1 Score: {results_df['final_f1'].mean():.4f} ± {results_df['final_f1'].std():.4f}")
print(f"  Mean AUC: {results_df['final_auc'].mean():.4f} ± {results_df['final_auc'].std():.4f}")

fig, axes = plt.subplots(2, 3, figsize=(18, 12))
metrics = ['final_accuracy', 'final_precision', 'final_recall', 'final_f1', 'final_auc', 'best_val_accuracy']
titles = ['Accuracy', 'Precision', 'Recall', 'F1 Score', 'ROC AUC', 'Best Val Accuracy']

for i, (metric, title) in enumerate(zip(metrics, titles)):
    ax = axes[i//3, i%3]
    
    bp = ax.boxplot(results_df[metric], labels=[title], patch_artist=True)
    bp['boxes'][0].set_facecolor('lightgreen')
    bp['boxes'][0].set_alpha(0.7)
    
    ax.scatter([1]*len(results_df), results_df[metric], alpha=0.8, s=60, color='darkgreen')
    
    mean_val = results_df[metric].mean()
    ax.axhline(y=mean_val, color='red', linestyle='--', alpha=0.7, linewidth=1.5)
    
    ax.set_title(f'{title}\n(mean={mean_val:.3f}, std={results_df[metric].std():.3f})', fontsize=11, fontweight='bold', pad=15)
    ax.grid(True, alpha=0.3)
    ax.set_ylim(0, 1.05)

plt.suptitle('Visual-Only K-Fold Cross Validation Results', fontsize=14, fontweight='bold', y=0.98)
plt.tight_layout()
plt.show()

In [None]:
## Live Image Testing Function
def interpret_confidence(confidence_score):
    if confidence_score >= 0.8:
        return f"HIGH CONFIDENCE: {confidence_score:.1%} - Automated decision"
    elif confidence_score >= 0.6:
        return f"MEDIUM CONFIDENCE: {confidence_score:.1%} - Confirmation Needed"
    else:
        return f"LOW CONFIDENCE: {confidence_score:.1%} - Further Review Needed"

def test_single_thermal_image_visual_only(image_path, model):
    try:
        image = Image.open(image_path).convert('L')
        transform = get_thermal_transforms('val')
        img_tensor = transform(image).unsqueeze(0).to(device)
        
        model.eval()
        with torch.no_grad():
            outputs = model(img_tensor)
            probs = F.softmax(outputs, dim=1)
            confidence = probs[0, 1].item()
            predicted_class = 'Water Detected' if confidence > 0.5 else 'No Water (NWI)'
            confidence_interp = interpret_confidence(confidence)
        
        plt.figure(figsize=(12, 5))
        
        plt.subplot(1, 2, 1)
        plt.imshow(image, cmap='gray')
        plt.title(f'Thermal Image \n{os.path.basename(image_path)}', fontsize=12, fontweight='bold')
        plt.axis('off')
        
        plt.subplot(1, 2, 2)
        if confidence >= 0.8:
            bg_color = 'lightgreen' if predicted_class == 'No Water (NWI)' else 'lightcoral'
        elif confidence >= 0.6:
            bg_color = 'lightyellow'
        else:
            bg_color = 'lightgray'
        
        plt.gca().set_facecolor(bg_color)
        
        y_pos = 0.9
        plt.text(0.1, y_pos, f"Prediction: {predicted_class}", fontsize=14, weight='bold')
        y_pos -= 0.2
        plt.text(0.1, y_pos, f"Confidence: {confidence:.3f}", fontsize=12)
        y_pos -= 0.15
        plt.text(0.1, y_pos, confidence_interp, fontsize=11, style='italic')
        
        plt.xlim(0, 1)
        plt.ylim(0, 1)
        plt.axis('off')
        plt.title('Visual-Only CNN Results', fontsize=12, fontweight='bold')
        
        plt.tight_layout()
        plt.show()
        
        print(f"File: {os.path.basename(image_path)}")
        print(f"Prediction: {predicted_class}")
        print(f"Confidence: {confidence:.6f}")
        print(f"Interpretation: {confidence_interp}")
        
        return confidence, predicted_class
        
    except Exception as e:
        print(f"Testing failed: {str(e)}")
        return None, None

In [None]:
## Live Testing Example (Remove comment > #)

# test_single_thermal_image_visual_only(r"IMAGE PATH HERE", best_model)