In [15]:
# Cell 1: Imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
from torchvision import models
import os
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix
import copy
from collections import deque
import time

In [16]:
# Cell 2: Device setup and transforms
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(p=0.1),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomAffine(degrees=5, translate=(0.05, 0.05), scale=(0.95, 1.05)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

Using device: cuda


In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [17]:
# Cell 3: Dataset class
class RunwayImageDataset(Dataset):
    def __init__(self, image_paths, labels=None, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert("RGB")
        if self.transform:
            image = self.transform(image)
        label = self.labels[idx] if self.labels else -1
        return image, label

In [18]:
# Cell 4: Class definitions and mappings
GRF_CLASSES = {
    "Dry": 6,
    "Damp": 5,
    "Compacted Snow": 4,
    "Wet": 3,
    "Standing Water": 2,
    "Ice": 1,
    "Wet Ice": 0
}

GRF_CLASS_NAMES = {v: k for k, v in GRF_CLASSES.items()}

folder_to_grf = {
    "asphalt_dry": 6, "concrete_dry": 6, "cobble_dry": 6,
    "asphalt_damp": 5, "concrete_damp": 5, "cobble_damp": 5,
    "fully_packed": 4,
    "asphalt_wet": 3, "concrete_wet": 3, "cobble_wet": 3,
    "asphalt_verywet": 2, "concrete_verywet": 2, "cobble_verywet": 2,
    "fresh_snow": 1, "fresh_fallen": 1,
    "partially_covered": 0
}

In [19]:
# Cell 5: Dataset preparation function
def prepare_datasets(base_dirs, transform_fn):
    image_paths, labels = [], []

    class_counts = {label: 0 for label in range(7)}

    for base_dir in base_dirs:
        if not os.path.exists(base_dir):
            print(f"Warning: Directory {base_dir} does not exist. Skipping.")
            continue

        for folder in os.listdir(base_dir):
            class_dir = os.path.join(base_dir, folder)
            if os.path.isdir(class_dir) and folder in folder_to_grf:
                grf_label = folder_to_grf[folder]
                files = [f for f in os.listdir(class_dir) if f.lower().endswith((".jpg", ".jpeg", ".png"))]

                for file in files:
                    image_paths.append(os.path.join(class_dir, file))
                    labels.append(grf_label)
                    class_counts[grf_label] += 1

    if not image_paths:
        raise ValueError(f"No images found in the specified directories: {base_dirs}")

    print("Class distribution:")
    for label, count in class_counts.items():
        if count > 0:
            print(f"  {GRF_CLASS_NAMES[label]} (RWYCC {label}): {count} images")

    total = sum(class_counts.values())
    print(f"Total: {total} images")

    return RunwayImageDataset(image_paths, labels, transform_fn)

In [20]:
# Cell 6: Model definition
class GRFClassifier(nn.Module):
    def __init__(self, backbone='efficientnet_b3', num_classes=7):
        super(GRFClassifier, self).__init__()

        if backbone == 'mobilenetv2':
            self.model = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.DEFAULT)
            in_features = self.model.classifier[1].in_features
            self.model.classifier[1] = nn.Linear(in_features, num_classes)
        else:
            self.model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT)
            in_features = self.model.classifier[1].in_features
            self.model.classifier[1] = nn.Linear(in_features, num_classes)

        nn.init.xavier_uniform_(self.model.classifier[1].weight)

    def forward(self, x):
        return self.model(x)

In [12]:
# Cell 7: Evaluation function
def evaluate_model(model, dataloader, device, class_names=None):
    model.eval()
    all_preds = []
    all_labels = []
    all_confidences = []
    inference_times = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)

            start_time = time.time()
            outputs = model(inputs)
            inference_time = time.time() - start_time

            probs = torch.nn.functional.softmax(outputs, dim=1)
            confidences, preds = torch.max(probs, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_confidences.extend(confidences.cpu().numpy())
            inference_times.append(inference_time / inputs.size(0))

    avg_inference_time = np.mean(inference_times)
    fps = 1.0 / avg_inference_time if avg_inference_time > 0 else 0

    print(f"Average inference time: {avg_inference_time*1000:.2f} ms per image")
    print(f"Estimated throughput: {fps:.2f} FPS")

    print("\nClassification Report:")
    if class_names:
        report = classification_report(all_labels, all_preds, target_names=class_names, output_dict=True)
        print(classification_report(all_labels, all_preds, target_names=class_names))
    else:
        report = classification_report(all_labels, all_preds, output_dict=True)
        print(classification_report(all_labels, all_preds))

    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(10, 8))
    if class_names:
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    else:
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(10, 6))
    correct = np.array(all_preds) == np.array(all_labels)
    sns.histplot(
        x=all_confidences,
        hue=correct,
        element="step",
        stat="density",
        common_norm=False,
        bins=30,
        palette=["red", "green"]
    )
    plt.title("Confidence Distribution (Correct vs. Incorrect Predictions)")
    plt.xlabel("Confidence")
    plt.ylabel("Density")
    plt.legend(["Incorrect", "Correct"])
    plt.tight_layout()
    plt.show()

    eval_results = {
        'predictions': all_preds,
        'labels': all_labels,
        'confidences': all_confidences,
        'inference_time': avg_inference_time,
        'fps': fps,
        'report': report,
        'confusion_matrix': cm
    }

    return eval_results

In [21]:
# Cell 8: Training function
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=20, patience=5):
    best_val_loss = float('inf')
    best_model_wts = copy.deepcopy(model.state_dict())
    trigger_times = 0

    history = {
        'train_loss': [], 'train_acc': [],
        'val_loss': [], 'val_acc': []
    }

    for epoch in range(1, num_epochs + 1):
        model.train()
        running_loss, running_correct, total = 0.0, 0, 0
        epoch_start_time = time.time()

        print(f"Epoch {epoch}/{num_epochs} - Training Phase")
        for batch_idx, (images, labels) in enumerate(train_loader):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)
            _, preds = outputs.max(1)
            running_correct += (preds == labels).sum().item()
            total += labels.size(0)

            if batch_idx % 10 == 0:
                print(f"  Batch {batch_idx}/{len(train_loader)} - Loss: {loss.item():.4f}")

        epoch_time = time.time() - epoch_start_time
        train_loss = running_loss / total
        train_acc = running_correct / total
        print(f"[Train] Epoch {epoch} Summary: Loss {train_loss:.4f}, Accuracy {train_acc:.4f}, Time {epoch_time:.2f}s")

        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)

        model.eval()
        val_loss, val_correct, val_total = 0.0, 0, 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * images.size(0)
                _, preds = outputs.max(1)
                val_correct += (preds == labels).sum().item()
                val_total += labels.size(0)

        val_loss = val_loss / val_total
        val_acc = val_correct / val_total
        print(f"[Validation] Epoch {epoch} Summary: Loss {val_loss:.4f}, Accuracy {val_acc:.4f}")

        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        scheduler.step()

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            trigger_times = 0
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_loss': val_loss,
                'val_acc': val_acc,
                'train_loss': train_loss,
                'train_acc': train_acc,
            }, "best_model_checkpoint.pth")
            print("  ✅ Checkpoint saved (best model so far)")
        else:
            trigger_times += 1
            print(f"  ⚠️ No improvement for {trigger_times} epoch(s)")
            if trigger_times >= patience:
                print("⛔ Early stopping triggered")
                break

    model.load_state_dict(best_model_wts)

    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.title('Loss vs. Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history['train_acc'], label='Train Accuracy')
    plt.plot(history['val_acc'], label='Validation Accuracy')
    plt.title('Accuracy vs. Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()

    return model, history

In [None]:
# Cell 9: Runway condition monitor
class RunwayConditionMonitor:
    def __init__(self, model, window_size=30, threshold=0.6):
        self.model = model
        self.model.eval()
        self.window_size = window_size
        self.threshold = threshold
        self.scores_history = deque(maxlen=window_size)
        self.transform = val_transform

    def process_frame(self, frame):
        if isinstance(frame, np.ndarray):
            frame = Image.fromarray(frame)

        img_tensor = self.transform(frame).unsqueeze(0).to(device)

        with torch.no_grad():
            output = self.model(img_tensor)
            probs = torch.nn.functional.softmax(output, dim=1)[0]
            confidence, pred_class = torch.max(probs, dim=0)

        self.scores_history.append(probs.cpu().numpy())

        result = {
            'rwycc': int(pred_class.item()),
            'condition': GRF_CLASS_NAMES[pred_class.item()],
            'confidence': float(confidence.item()),
            'probabilities': {GRF_CLASS_NAMES[i]: float(prob) for i, prob in enumerate(probs.cpu().numpy())}
        }

        if len(self.scores_history) >= self.window_size:
            trend = self.analyze_trend()
            result['trend'] = trend

        return result

    def analyze_trend(self):
        score_history = np.array(self.scores_history)

        window_size = min(5, len(score_history))
        smoothed_scores = np.zeros_like(score_history)
        for i in range(score_history.shape[1]):
            smoothed_scores[:, i] = np.convolve(score_history[:, i],
                                              np.ones(window_size)/window_size,
                                              mode='same')

        recent_window = max(5, len(smoothed_scores) // 2)
        recent_scores = smoothed_scores[-recent_window:]

        x = np.arange(recent_window).reshape(-1, 1)
        slopes = np.zeros(score_history.shape[1])

        for i in range(score_history.shape[1]):
            y = recent_scores[:, i]
            mean_x, mean_y = np.mean(x), np.mean(y)
            slopes[i] = np.sum((x - mean_x) * (y - mean_y)) / np.sum((x - mean_x) ** 2)

        current_class = np.argmax(score_history[-1])
        current_slope = slopes[current_class]

        if current_slope > 0.01:
            return "improving"
        elif current_slope < -0.01:
            return "worsening"
        else:
            return "stable"

    def generate_grf_report(self, location, runway_id):
        if not self.scores_history:
            return {"error": "No frames processed yet"}

        latest = self.process_frame(None)
        timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())

        grf_report = {
            "timestamp": timestamp,
            "location": location,
            "runway": runway_id,
            "rwycc": latest['rwycc'],
            "surface_condition": latest['condition'],
            "confidence": latest['confidence'],
            "trend": latest.get('trend', "unknown")
        }

        return grf_report

In [None]:
# Define your dataset paths here
train_dirs = [
    "/content/drive/MyDrive/datasets/RoadSaW-075_s/train",
    "/content/drive/MyDrive/datasets/RoadSC/train"
]

val_dirs = [
    "/content/drive/MyDrive/datasets/RoadSaW-075_s/validation",
    "/content/drive/MyDrive/datasets/RoadSC/validation"
]

test_dirs = [
    "/content/drive/MyDrive/datasets/RoadSaW-075_s/test",
    "/content/drive/MyDrive/datasets/RoadSC/test"
]

# Prepare datasets
train_dataset = prepare_datasets(train_dirs, transform)
val_dataset = prepare_datasets(val_dirs, val_transform)
test_dataset = prepare_datasets(test_dirs, val_transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# Initialize model
model = GRFClassifier(backbone='efficientnet_b3', num_classes=7).to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)

# Train model
model, history = train_model(
    model, train_loader, val_loader,
    criterion, optimizer, scheduler,
    num_epochs=30, patience=5
)

# Class names for evaluation
class_names = [GRF_CLASS_NAMES[i] for i in range(7)]

# Evaluate on test set
test_results = evaluate_model(model, test_loader, device, class_names)

# Save the final model
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'class_mapping': folder_to_grf,
    'grf_class_names': GRF_CLASS_NAMES,
    'performance': {
        'test_accuracy': test_results['report']['accuracy'],
        'f1_macro': test_results['report']['macro avg']['f1-score'],
        'inference_time_ms': test_results['inference_time'] * 1000,
        'fps': test_results['fps']
    }
}, 'runway_condition_classifier_final.pth')

In [None]:
# Initialize monitor with trained model
runway_monitor = RunwayConditionMonitor(model, window_size=30)

# Simulating processing of video frames
import cv2
cap = cv2.VideoCapture('/content/drive/MyDrive/datasetsrunway_video.mp4')

results = []
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    result = runway_monitor.process_frame(frame_rgb)
    results.append(result)

    condition_text = f"{result['condition']} (RWYCC: {result['rwycc']}, Conf: {result['confidence']:.2f})"
    cv2.putText(frame, condition_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    if 'trend' in result:
        trend_text = f"Trend: {result['trend']}"
        cv2.putText(frame, trend_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    cv2.imshow('Runway Condition Monitor', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

# Generate GRF report
grf_report = runway_monitor.generate_grf_report("EGLL", "27L")
print(grf_report)
