In [None]:
"""
Aircraft X-Ray CNN Production Deployment
=======================================
Production web deployment of ResNet50 CNN for automated aircraft X-ray water ingression detection.
Target: 92.9% accuracy achieved with PyTorch + FastAPI for live web application deployment.
Dataset: 138 X-ray images with mobile-responsive interface and safety-critical confidence scoring
Method: TensorFlow→PyTorch migration with FastAPI backend and JavaScript frontend
Key Features: Three-tier confidence framework, mobile camera integration, Hugging Face Spaces deployment
Technical Achievement: Complete ML lifecycle from research model to accessible production application
Deployment: Live web app with drag-and-drop ROI selection and real-time prediction capabilities
"""

In [None]:
## Libraries and Setup
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix, classification_report
import os
from PIL import Image
import random
from tqdm import tqdm
import shutil
import glob

In [None]:
## Reproducibility and Device configuration
# Random seeds for reproducibility
def set_random_seeds(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_random_seeds(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
## Data Configuration
DATA_DIR = r"[FILE PATH HERE]"  # Folder path here
BATCH_SIZE = 8
IMG_HEIGHT = 350
IMG_WIDTH = 512

# Custom Dataset Class
class WaterDetectionDataset(Dataset):
    def __init__(self, data_dir, transform=None, split='train', val_split=0.2, seed=42):
        self.data_dir = data_dir
        self.transform = transform
        self.seed = seed
        
        # Get all image paths and labels
        self.image_paths = []
        self.labels = []
        
        # Assuming folder structure: DATA_DIR/class1/, DATA_DIR/class2/
        class_folders = [f for f in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, f))]
        class_folders.sort()  # Ensure consistent ordering
        
        self.class_to_idx = {cls: idx for idx, cls in enumerate(class_folders)}
        print(f"Class indices: {self.class_to_idx}")
        
        for class_name in class_folders:
            class_dir = os.path.join(data_dir, class_name)
            image_files = glob.glob(os.path.join(class_dir, "*"))
            image_files = [f for f in image_files if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff'))]
            
            for img_path in image_files:
                self.image_paths.append(img_path)
                self.labels.append(self.class_to_idx[class_name])
        
        # Split data
        np.random.seed(seed)
        indices = np.random.permutation(len(self.image_paths))
        split_idx = int(len(indices) * (1 - val_split))
        
        if split == 'train':
            self.indices = indices[:split_idx]
        else:
            self.indices = indices[split_idx:]
        
        print(f"{split.capitalize()} samples: {len(self.indices)}")
    
    def __len__(self):
        return len(self.indices)
    
    def __getitem__(self, idx):
        actual_idx = self.indices[idx]
        image_path = self.image_paths[actual_idx]
        label = self.labels[actual_idx]
        
        # Load image as grayscale
        image = Image.open(image_path).convert('L')
        
        if self.transform:
            image = self.transform(image)
        
        return image, torch.tensor(label, dtype=torch.float32)

In [None]:
## Data Transforms
train_transforms = transforms.Compose([
    transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2),
    transforms.RandomHorizontalFlip(),
    transforms.RandomAffine(degrees=0, shear=0.1, scale=(0.9, 1.1)),
    transforms.ToTensor(),
    # Convert grayscale to RGB for ResNet
    transforms.Lambda(lambda x: x.repeat(3, 1, 1)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.repeat(3, 1, 1)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Create datasets and loaders
train_dataset = WaterDetectionDataset(DATA_DIR, transform=train_transforms, split='train')
val_dataset = WaterDetectionDataset(DATA_DIR, transform=val_transforms, split='val')

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

In [None]:
## Model Definition
class ResNetWaterDetector(nn.Module):
    def __init__(self, num_classes=1):
        super(ResNetWaterDetector, self).__init__()
        
        # Load pre-trained ResNet50
        self.backbone = models.resnet50(pretrained=True)
        
        # Freeze early layers (keep last 20 trainable)
        layers_to_freeze = list(self.backbone.children())[:-2]  # All except avgpool and fc
        for layer_group in layers_to_freeze[:-3]:  # Freeze all but last few layer groups
            for param in layer_group.parameters():
                param.requires_grad = False
        
        # Remove the final classification layer
        self.backbone = nn.Sequential(*list(self.backbone.children())[:-1])
        
        # Custom classifier
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Dropout(0.5),
            nn.Linear(2048, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, num_classes),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        features = self.backbone(x)
        output = self.classifier(features)
        return output.squeeze()

# Create model
model = ResNetWaterDetector().to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.3, patience=3, min_lr=1e-7)

# Count trainable parameters
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Trainable parameters: {trainable_params:,}")

In [None]:
## Training Functions
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0
    
    for images, labels in tqdm(dataloader, desc="Training"):
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        predicted = (outputs > 0.5).float()
        correct_predictions += (predicted == labels).sum().item()
        total_samples += labels.size(0)
    
    epoch_loss = running_loss / len(dataloader)
    epoch_acc = correct_predictions / total_samples
    return epoch_loss, epoch_acc

def validate_epoch(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    all_predictions = []
    all_labels = []
    all_probabilities = []
    
    with torch.no_grad():
        for images, labels in tqdm(dataloader, desc="Validation"):
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item()
            predicted = (outputs > 0.5).float()
            
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probabilities.extend(outputs.cpu().numpy())
    
    epoch_loss = running_loss / len(dataloader)
    epoch_acc = accuracy_score(all_labels, all_predictions)
    epoch_precision = precision_score(all_labels, all_predictions, zero_division=0)
    epoch_recall = recall_score(all_labels, all_predictions, zero_division=0)
    
    return epoch_loss, epoch_acc, epoch_precision, epoch_recall, all_labels, all_predictions, all_probabilities

In [None]:
## Training Loop
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=15, patience=6):
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [], 'val_precision': [], 'val_recall': []}
    
    best_val_acc = 0.0
    patience_counter = 0
    best_model_state = None
    
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print("-" * 50)
        
        # Training
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        
        # Validation
        val_loss, val_acc, val_precision, val_recall, _, _, _ = validate_epoch(model, val_loader, criterion, device)
        
        # Update learning rate
        scheduler.step(val_loss)
        
        # Save metrics
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['val_precision'].append(val_precision)
        history['val_recall'].append(val_recall)
        
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        print(f"Val Precision: {val_precision:.4f}, Val Recall: {val_recall:.4f}")
        
        # Early stopping
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0
            best_model_state = model.state_dict().copy()
            print(f"New best validation accuracy: {best_val_acc:.4f}")
        else:
            patience_counter += 1
            print(f"Patience: {patience_counter}/{patience}")
            
            if patience_counter >= patience:
                print("Early stopping triggered")
                break
    
    # Load best model
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
        print(f"Loaded best model with validation accuracy: {best_val_acc:.4f}")
    
    return history

# Train the model
print("Starting training...")
history = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler)

In [None]:
## Model Evaluation
def evaluate_model(model, dataloader, device):
    val_loss, val_acc, val_precision, val_recall, y_true, y_pred, y_prob = validate_epoch(model, dataloader, criterion, device)
    
    # Calculate additional metrics
    f1 = f1_score(y_true, y_pred)
    auc = roc_auc_score(y_true, y_prob)
    
    # Classification report
    target_names = ['Nil', 'Water']
    print(classification_report(y_true, y_pred, target_names=target_names))
    
    print(f"Accuracy:  {val_acc:.3f}")
    print(f"Precision: {val_precision:.3f}")
    print(f"Recall:    {val_recall:.3f}")
    print(f"F1 Score:  {f1:.3f}")
    print(f"ROC AUC:   {auc:.3f}")
    
    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Nil', 'Water'], yticklabels=['Nil', 'Water'])
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.show()
    
    return val_acc, val_precision, val_recall, f1, auc

print("Evaluating model")
accuracy, precision, recall, f1, auc = evaluate_model(model, val_loader, device)

In [None]:
## Training History Visualization
def plot_training_history(history):
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Accuracy
    axes[0, 0].plot(history['train_acc'], label='Training Accuracy')
    axes[0, 0].plot(history['val_acc'], label='Validation Accuracy')
    axes[0, 0].set_title('Model Accuracy')
    axes[0, 0].set_xlabel('Epoch')
    axes[0, 0].set_ylabel('Accuracy')
    axes[0, 0].legend()
    axes[0, 0].grid(True)
    
    # Loss
    axes[0, 1].plot(history['train_loss'], label='Training Loss')
    axes[0, 1].plot(history['val_loss'], label='Validation Loss')
    axes[0, 1].set_title('Model Loss')
    axes[0, 1].set_xlabel('Epoch')
    axes[0, 1].set_ylabel('Loss')
    axes[0, 1].legend()
    axes[0, 1].grid(True)
    
    # Precision
    axes[1, 0].plot(history['val_precision'], label='Validation Precision')
    axes[1, 0].set_title('Model Precision')
    axes[1, 0].set_xlabel('Epoch')
    axes[1, 0].set_ylabel('Precision')
    axes[1, 0].legend()
    axes[1, 0].grid(True)
    
    # Recall
    axes[1, 1].plot(history['val_recall'], label='Validation Recall')
    axes[1, 1].set_title('Model Recall')
    axes[1, 1].set_xlabel('Epoch')
    axes[1, 1].set_ylabel('Recall')
    axes[1, 1].legend()
    axes[1, 1].grid(True)
    
    plt.tight_layout()
    plt.show()

plot_training_history(history)

In [None]:
## Confidence Interpretation
def interpret_prediction(confidence_score):
    if confidence_score > 0.8:
        return f"HIGH CONFIDENCE: {confidence_score:.1%}"
    elif confidence_score > 0.6:
        return f"MEDIUM CONFIDENCE: {confidence_score:.1%}"
    else:
        return f"LOW CONFIDENCE: {confidence_score:.1%} - Manual review recommended"

def test_predictions(model, dataloader, device, num_samples=10):
    model.eval()
    results = []
    
    with torch.no_grad():
        for i, (images, labels) in enumerate(dataloader):
            if len(results) >= num_samples:
                break
                
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            
            for j in range(images.size(0)):
                if len(results) >= num_samples:
                    break
                    
                confidence = outputs[j].item()
                predicted_class = 1 if confidence > 0.5 else 0
                true_label = 'Water' if labels[j].item() == 1 else 'Nil'
                pred_label = 'Water' if predicted_class == 1 else 'Nil'
                confidence_interp = interpret_prediction(confidence)
                
                results.append({'Batch': i, 'Sample': j, 'True_Label': true_label, 'Predicted_Label': pred_label, 'Confidence_Score': confidence, 
                                'Confidence_Interpretation': confidence_interp, 'Correct_Prediction': predicted_class == labels[j].item()})
                
    return pd.DataFrame(results)

predictions_df = test_predictions(model, val_loader, device, 10)
print(predictions_df.to_string(index=False))

In [None]:
## Single Image Prediction
def test_single_image(image_path, model, device):
    model.eval()
    
    # Load and preprocess image
    image = Image.open(image_path).convert('L')
    original_image = image.copy()
    
    # Apply same transforms as validation (without augmentation)
    transform = transforms.Compose([transforms.Resize((IMG_HEIGHT, IMG_WIDTH)), transforms.ToTensor(), transforms.Lambda(lambda x: x.repeat(3, 1, 1)), 
                                    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
    
    image_tensor = transform(image).unsqueeze(0).to(device)
    
    # Get prediction
    with torch.no_grad():
        prediction = model(image_tensor).item()
    
    predicted_class = 'Water' if prediction > 0.5 else 'Nil'
    confidence_interp = interpret_prediction(prediction)
    
    # Display results
    display_img = original_image.resize((IMG_WIDTH, IMG_HEIGHT))
    
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(display_img, cmap='gray')
    plt.title(f'Test: {os.path.basename(image_path)}')
    plt.axis('off')
    
    plt.subplot(1, 2, 2)
    plt.text(0.1, 0.7, f"Prediction: {predicted_class}", fontsize=14, weight='bold')
    plt.text(0.1, 0.5, f"Confidence: {prediction:.3f}", fontsize=12)
    plt.text(0.1, 0.3, confidence_interp, fontsize=10)
    plt.xlim(0, 1)
    plt.ylim(0, 1)
    plt.axis('off')
    plt.title('Results')
    plt.tight_layout()
    plt.show()
    
    print(f"File: {os.path.basename(image_path)}")
    print(f"Prediction: {predicted_class}")
    print(f"Confidence: {prediction:.6f}")
    print(f"Interpretation: {confidence_interp}")
    
    return prediction, predicted_class

In [None]:
## Test single image (uncomment below)
# test_single_image(r"[FILE PATH HERE]" , model, device)

In [None]:
## Save Model
# torch.save(model.state_dict(), 'Aircraft_Flap_Water_Detection_PyTorch.pth')