# SwellSight Wave Analysis Model - Model Evaluation and Validation

This notebook provides comprehensive evaluation and validation of the trained SwellSight wave analysis model.

## Overview
- Load trained model and evaluation data
- Perform comprehensive model validation
- Generate performance metrics and visualizations
- Analyze model predictions on test data
- Create deployment readiness assessment
- Generate final pipeline summary

## Prerequisites
- Complete execution of all previous notebooks (01-07)
- Trained model available from notebook 06
- Test dataset prepared and validated

---

## 1. Setup and Configuration

In [1]:
import sys
from pathlib import Path

# Check if running in Google Colab
if 'google.colab' in sys.modules:
    from google.colab import drive
    print("Mounting Google Drive...")

    try:
        # Attempt 1: Standard mount
        drive.mount('/content/drive')
        print("‚úì Google Drive mounted successfully")
    except Exception as e:
        print(f"Standard mount failed: {e}")

        # Attempt 2: Force remount with extended timeout (robust fallback)
        print("Trying force remount with extended timeout...")
        try:
            drive.mount('/content/drive', force_remount=True, timeout_ms=300000)
            print("‚úì Force remount successful")
        except Exception as e2:
            print(f"‚ùå Critical failure mounting drive: {e2}")
            raise

    # Verify the specific project path exists
    # Adjust this path if your folder structure changes
    PROJECT_PATH = Path('/content/drive/MyDrive/SwellSight')
    if PROJECT_PATH.exists():
        print(f"‚úì Project directory found: {PROJECT_PATH}")
        # Add project path to sys.path for module imports
        if str(PROJECT_PATH) not in sys.path:
            sys.path.append(str(PROJECT_PATH))
            print(f"‚úì Added {PROJECT_PATH} to sys.path")
    else:
        print(f"‚ö†Ô∏è Project directory not found at: {PROJECT_PATH}")
else:
    print("Not running in Google Colab. Skipping Drive mount.")

Mounting Google Drive...
Mounted at /content/drive
‚úì Google Drive mounted successfully
‚úì Project directory found: /content/drive/MyDrive/SwellSight
‚úì Added /content/drive/MyDrive/SwellSight to sys.path


Let's inspect the `swellsight` directory to understand its structure and check for necessary `__init__.py` files, which are crucial for Python to recognize it as a package. This will help us diagnose the `ModuleNotFoundError`.

In [2]:
import os

project_path = '/content/drive/MyDrive/SwellSight'
swellsight_path = os.path.join(project_path, 'final_model.pth')

print(f"Contents of {swellsight_path}:")
if os.path.exists(swellsight_path):
    for root, dirs, files in os.walk(swellsight_path):
        level = root.replace(swellsight_path, '').count(os.sep)
        indent = ' ' * 4 * (level)
        print(f'{indent}{os.path.basename(root)}/')
        subindent = ' ' * 4 * (level + 1)
        for f in files:
            print(f'{subindent}{f}')
else:
    print(f"Directory not found: {swellsight_path}")

Contents of /content/drive/MyDrive/SwellSight/final_model.pth:
Directory not found: /content/drive/MyDrive/SwellSight/final_model.pth


In [3]:
import sys
import os
import json
import logging
from pathlib import Path
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import torch
import torch.nn as nn
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from tqdm.auto import tqdm
import time

plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
IN_COLAB = 'google.colab' in sys.modules

print("‚úì Libraries imported successfully")

‚úì Libraries imported successfully


In [4]:
# Load configuration
try:
    # Check if SWELLSIGHT_CONFIG is already defined (e.g., from a previous notebook)
    CONFIG = globals().get('SWELLSIGHT_CONFIG')

    if CONFIG is None:
        # User specified a specific config file
        user_config_file = Path('/content/drive/MyDrive/SwellSight/data/metadata/pipeline_config.json')

        if user_config_file.exists():
            with open(user_config_file, 'r') as f:
                CONFIG = json.load(f)
            print(f"‚úì Configuration loaded from user-specified file: {user_config_file}")
        else:
            # Fallback to session_config.json or default if user_config_file not found
            session_config_file = Path('/content/drive/MyDrive/SwellSight/session_config.json') if IN_COLAB else Path('SwellSight/session_config.json')
            if session_config_file.exists():
                with open(session_config_file, 'r') as f:
                    CONFIG = json.load(f)
                print(f"‚úì Configuration loaded from session_config.json: {session_config_file}")
            else:
                # Default configuration if no config file is found
                CONFIG = {
                    'paths': {
                        'checkpoints_path': '/content/drive/MyDrive/SwellSight/checkpoints',
                        'metadata_path': '/content/drive/MyDrive/SwellSight/data/metadata',
                        'real_data_path': 'data/real',
                        'test_output_path': 'test_output'
                    }
                }
                print("‚úì Default configuration applied.")

    # Ensure CONFIG['paths'] exists and update default paths if not explicitly set in config file
    if 'paths' not in CONFIG:
        CONFIG['paths'] = {}
    # Ensure user-provided checkpoints and metadata paths are set if not present
    if 'checkpoints_path' not in CONFIG['paths']:
        CONFIG['paths']['checkpoints_path'] = '/content/drive/MyDrive/SwellSight/checkpoints'
    if 'metadata_path' not in CONFIG['paths']:
        CONFIG['paths']['metadata_path'] = '/content/drive/MyDrive/SwellSight/data/metadata'
    if 'real_data_path' not in CONFIG['paths']:
        CONFIG['paths']['real_data_path'] = 'data/real'
    if 'test_output_path' not in CONFIG['paths']:
        CONFIG['paths']['test_output_path'] = 'test_output'


    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger('SwellSight_Evaluation')

    print("‚úì Configuration processing complete.")
    print(f"  Final checkpoints_path: {CONFIG['paths']['checkpoints_path']}")
    print(f"  Final metadata_path: {CONFIG['paths']['metadata_path']}")

except Exception as e:
    print(f"‚ö†Ô∏è  Error loading configuration: {e}")
    # Fallback to a minimal CONFIG if there's an error
    CONFIG = {
        'paths': {
            'checkpoints_path': '/content/drive/MyDrive/SwellSight/checkpoints',
            'metadata_path': '/content/drive/MyDrive/SwellSight/data/metadata',
            'real_data_path': 'data/real',
            'test_output_path': 'test_output'
        }
    }
    print("‚ö†Ô∏è  Fallback minimal configuration applied due to error.")

‚úì Configuration loaded from user-specified file: /content/drive/MyDrive/SwellSight/data/metadata/pipeline_config.json
‚úì Configuration processing complete.
  Final checkpoints_path: /content/drive/MyDrive/SwellSight/checkpoints
  Final metadata_path: /content/drive/MyDrive/SwellSight/data/metadata


## 2. Load Trained Model

In [6]:
from swellsight.models.wave_analysis_model import WaveAnalysisModel
from swellsight.config.model_config import ModelConfig

def load_model_from_checkpoint(checkpoint_path: Path):
    """Load model from checkpoint file."""
    ckpt = torch.load(checkpoint_path, map_location='cpu')

    if isinstance(ckpt, dict) and 'model_state_dict' in ckpt:
        metadata = ckpt.get('metadata', {})
        cfg_dict = metadata.get('model_config', {})
        config = ModelConfig.from_dict(cfg_dict) if cfg_dict else ModelConfig()
        model = WaveAnalysisModel(config)
        model.load_state_dict(ckpt['model_state_dict'], strict=False)
        return model, metadata, ckpt.get('training_history', {})

    if isinstance(ckpt, nn.Module):
        return ckpt, {}, {}

    config = ModelConfig()
    model = WaveAnalysisModel(config)
    try:
        model.load_state_dict(ckpt)
        return model, {}, {}
    except:
        return None, {}, {}

# Find and load specific model
trained_model = None
model_metadata = {}
training_history = {}

checkpoints_dir = Path(CONFIG['paths'].get('checkpoints_path', 'checkpoints'))
model_file = checkpoints_dir / 'final_model.pth' # Explicitly use final_model.pth

if model_file.exists():
    print(f"Loading specific model: {model_file}")
    trained_model, model_metadata, training_history = load_model_from_checkpoint(model_file)
    if trained_model:
        trained_model.eval()
        print("‚úì Model loaded and set to evaluation mode")
    else:
        print("‚ùå Failed to load model from " + str(model_file))
else:
    print(f"‚ùå Specified model file not found: {model_file}")
    print(f"  Looked in directory: {checkpoints_dir}")
    print(f"  Available files: {list(checkpoints_dir.glob('*')) if checkpoints_dir.exists() else 'Directory does not exist.'}")

ModuleNotFoundError: No module named 'swellsight'

## 3. Prepare Test Dataset

In [None]:
class WaveTestDataset(Dataset):
    def __init__(self, images_dir: Path, annotations: dict = None, input_size=(768, 768)):
        self.images_dir = Path(images_dir)
        self.files = sorted([p for p in self.images_dir.glob('**/*')
                           if p.suffix.lower() in ('.jpg', '.jpeg', '.png')])
        self.annotations = annotations or {}
        self.transform = T.Compose([
            T.Resize(input_size),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        path = self.files[idx]
        image = Image.open(path).convert('RGB')
        image_tensor = self.transform(image)
        label = self.annotations.get(path.name)
        return {
            'image': image_tensor,
            'path': str(path),
            'label': label
        }

# Load annotations
annotations = {}
annotation_files = [
    Path(CONFIG['paths'].get('metadata_path', 'data/metadata')) / 'test_annotations.json',
    Path('test_output') / 'pipeline_results_20251231_230407.json'
]

for ann_file in annotation_files:
    if ann_file.exists():
        try:
            with open(ann_file, 'r') as f:
                data = json.load(f)
                if isinstance(data, list):
                    for item in data:
                        if 'file_name' in item:
                            annotations[item['file_name']] = item
                elif isinstance(data, dict):
                    annotations.update(data)
            print(f"‚úì Loaded annotations from {ann_file}")
            break
        except:
            continue

# Create test dataset
test_dataset = None
test_images_path = Path(CONFIG['paths'].get('test_output_path', 'data/real'))
if test_images_path.exists():
    test_dataset = WaveTestDataset(test_images_path, annotations)
    print(f"‚úì Test dataset created with {len(test_dataset)} images")
else:
    print(f"‚ùå Test images directory not found: {test_images_path}")

## 4. Model Evaluation

In [None]:
def evaluate_model(model, dataset, batch_size=8, device='cpu'):
    """Evaluate model on test dataset."""
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    predictions = []
    labels = []
    paths = []

    model.to(device)
    start_time = time.time()

    with torch.no_grad():
        for batch in tqdm(loader, desc='Evaluating'):
            images = batch['image'].to(device)
            outputs = model(images)

            # Extract predictions
            heights = outputs['height'].squeeze(-1).cpu().numpy()
            wave_types = outputs['wave_type'].argmax(dim=1).cpu().numpy()
            directions = outputs['direction'].argmax(dim=1).cpu().numpy()

            for i in range(len(batch['path'])):
                pred = {
                    'height': float(heights[i]),
                    'wave_type': int(wave_types[i]),
                    'direction': int(directions[i])
                }
                predictions.append(pred)
                paths.append(batch['path'][i])
                labels.append(batch['label'][i])

    inference_time = time.time() - start_time
    return paths, predictions, labels, {'inference_time': inference_time, 'num_samples': len(dataset)}

# Run evaluation if model and dataset are available
evaluation_results = {}
if trained_model and test_dataset and len(test_dataset) > 0:
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Running evaluation on {device}...")

    paths, predictions, labels, metrics = evaluate_model(trained_model, test_dataset, device=device)

    evaluation_results = {
        'paths': paths,
        'predictions': predictions,
        'labels': labels,
        'metrics': metrics
    }

    print(f"‚úì Evaluation completed in {metrics['inference_time']:.2f}s")
    print(f"  Average inference time: {metrics['inference_time']/metrics['num_samples']:.3f}s per image")
else:
    print("‚ùå Cannot run evaluation - model or dataset not available")

## 5. Performance Metrics

In [None]:
def compute_metrics(predictions, labels):
    """Compute performance metrics when ground truth labels are available."""
    metrics = {}

    # Separate predictions and labels by task
    heights_true, heights_pred = [], []
    wave_types_true, wave_types_pred = [], []
    directions_true, directions_pred = [], []

    for pred, label in zip(predictions, labels):
        if label is None:
            continue

        if isinstance(label, dict):
            if 'height' in label and label['height'] is not None:
                heights_true.append(float(label['height']))
                heights_pred.append(float(pred['height']))

            if 'wave_type' in label and label['wave_type'] is not None:
                wave_types_true.append(int(label['wave_type']))
                wave_types_pred.append(int(pred['wave_type']))

            if 'direction' in label and label['direction'] is not None:
                directions_true.append(int(label['direction']))
                directions_pred.append(int(pred['direction']))

    # Height regression metrics
    if heights_true:
        metrics['height'] = {
            'mse': float(mean_squared_error(heights_true, heights_pred)),
            'mae': float(mean_absolute_error(heights_true, heights_pred)),
            'r2': float(r2_score(heights_true, heights_pred)),
            'samples': len(heights_true)
        }

    # Wave type classification metrics
    if wave_types_true:
        metrics['wave_type'] = {
            'accuracy': float(accuracy_score(wave_types_true, wave_types_pred)),
            'f1_macro': float(f1_score(wave_types_true, wave_types_pred, average='macro')),
            'samples': len(wave_types_true)
        }

    # Direction classification metrics
    if directions_true:
        metrics['direction'] = {
            'accuracy': float(accuracy_score(directions_true, directions_pred)),
            'f1_macro': float(f1_score(directions_true, directions_pred, average='macro')),
            'samples': len(directions_true)
        }

    return metrics, (heights_true, heights_pred), (wave_types_true, wave_types_pred), (directions_true, directions_pred)

# Compute metrics if evaluation was successful
performance_metrics = {}
if evaluation_results:
    metrics, height_data, wave_data, dir_data = compute_metrics(
        evaluation_results['predictions'],
        evaluation_results['labels']
    )
    performance_metrics = metrics

    print("üìä Performance Metrics:")
    for task, task_metrics in metrics.items():
        print(f"\n{task.upper()}:")
        for metric, value in task_metrics.items():
            if metric != 'samples':
                print(f"  {metric}: {value:.4f}")
            else:
                print(f"  {metric}: {value}")
else:
    print("‚ùå No evaluation results available for metrics computation")

## 6. Visualizations

In [None]:
# Create visualizations if we have evaluation results
if evaluation_results and performance_metrics:
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))

    # Height scatter plot
    if 'height' in performance_metrics:
        heights_true, heights_pred = height_data
        axes[0].scatter(heights_true, heights_pred, alpha=0.6)
        min_h, max_h = min(heights_true), max(heights_true)
        axes[0].plot([min_h, max_h], [min_h, max_h], 'r--', alpha=0.8)
        axes[0].set_xlabel('True Height (m)')
        axes[0].set_ylabel('Predicted Height (m)')
        axes[0].set_title(f'Height Prediction\nR¬≤ = {performance_metrics["height"]["r2"]:.3f}')
        axes[0].grid(True, alpha=0.3)
    else:
        axes[0].text(0.5, 0.5, 'No height labels\navailable', ha='center', va='center', transform=axes[0].transAxes)
        axes[0].set_title('Height Prediction')

    # Wave type confusion matrix
    if 'wave_type' in performance_metrics:
        wave_true, wave_pred = wave_data
        cm = confusion_matrix(wave_true, wave_pred)
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[1])
        axes[1].set_xlabel('Predicted')
        axes[1].set_ylabel('True')
        axes[1].set_title(f'Wave Type\nAccuracy = {performance_metrics["wave_type"]["accuracy"]:.3f}')
    else:
        axes[1].text(0.5, 0.5, 'No wave type labels\navailable', ha='center', va='center', transform=axes[1].transAxes)
        axes[1].set_title('Wave Type Classification')

    # Direction confusion matrix
    if 'direction' in performance_metrics:
        dir_true, dir_pred = dir_data
        cm = confusion_matrix(dir_true, dir_pred)
        sns.heatmap(cm, annot=True, fmt='d', cmap='Greens', ax=axes[2])
        axes[2].set_xlabel('Predicted')
        axes[2].set_ylabel('True')
        axes[2].set_title(f'Direction\nAccuracy = {performance_metrics["direction"]["accuracy"]:.3f}')
    else:
        axes[2].text(0.5, 0.5, 'No direction labels\navailable', ha='center', va='center', transform=axes[2].transAxes)
        axes[2].set_title('Direction Classification')

    plt.tight_layout()
    plt.show()
else:
    print("‚ùå No evaluation results available for visualization")

## 7. Deployment Readiness Assessment

In [None]:
def assess_deployment_readiness(model, model_metadata, performance_metrics):
    """Assess if model is ready for deployment."""
    checks = {}

    if model is not None:
        # Model size check
        param_count = sum(p.numel() for p in model.parameters())
        checks['model_parameters'] = param_count
        checks['model_size_mb'] = param_count * 4 / (1024 * 1024)  # Assuming float32

        # JIT compilation test
        try:
            model.eval()
            sample_input = torch.randn(1, 3, 768, 768)
            traced_model = torch.jit.trace(model, sample_input)
            checks['jit_compatible'] = True
        except Exception as e:
            checks['jit_compatible'] = False
            checks['jit_error'] = str(e)

        # Inference speed test
        try:
            model.eval()
            sample_input = torch.randn(1, 3, 768, 768)

            # Warmup
            for _ in range(3):
                _ = model(sample_input)

            # Timing
            start_time = time.time()
            for _ in range(10):
                _ = model(sample_input)
            avg_inference_time = (time.time() - start_time) / 10

            checks['avg_inference_time_s'] = avg_inference_time
            checks['inference_fps'] = 1.0 / avg_inference_time
        except Exception as e:
            checks['inference_speed_error'] = str(e)

    # Performance thresholds (adjust based on requirements)
    deployment_ready = True
    issues = []

    if performance_metrics:
        if 'height' in performance_metrics:
            if performance_metrics['height']['r2'] < 0.7:
                issues.append(f"Height R¬≤ too low: {performance_metrics['height']['r2']:.3f} < 0.7")
                deployment_ready = False

        if 'wave_type' in performance_metrics:
            if performance_metrics['wave_type']['accuracy'] < 0.8:
                issues.append(f"Wave type accuracy too low: {performance_metrics['wave_type']['accuracy']:.3f} < 0.8")
                deployment_ready = False

        if 'direction' in performance_metrics:
            if performance_metrics['direction']['accuracy'] < 0.8:
                issues.append(f"Direction accuracy too low: {performance_metrics['direction']['accuracy']:.3f} < 0.8")
                deployment_ready = False

    if checks.get('avg_inference_time_s', 0) > 1.0:
        issues.append(f"Inference too slow: {checks['avg_inference_time_s']:.3f}s > 1.0s")
        deployment_ready = False

    checks['deployment_ready'] = deployment_ready
    checks['issues'] = issues

    return checks

# Run deployment assessment
deployment_assessment = assess_deployment_readiness(trained_model, model_metadata, performance_metrics)

print("üöÄ Deployment Readiness Assessment:")
print(f"\nModel Statistics:")
if 'model_parameters' in deployment_assessment:
    print(f"  Parameters: {deployment_assessment['model_parameters']:,}")
    print(f"  Model size: {deployment_assessment['model_size_mb']:.1f} MB")

print(f"\nPerformance:")
if 'avg_inference_time_s' in deployment_assessment:
    print(f"  Inference time: {deployment_assessment['avg_inference_time_s']:.3f}s")
    print(f"  Throughput: {deployment_assessment['inference_fps']:.1f} FPS")

print(f"\nCompatibility:")
print(f"  JIT compatible: {deployment_assessment.get('jit_compatible', 'Unknown')}")

print(f"\nDeployment Status: {'‚úÖ READY' if deployment_assessment['deployment_ready'] else '‚ùå NOT READY'}")
if deployment_assessment['issues']:
    print("\nIssues to address:")
    for issue in deployment_assessment['issues']:
        print(f"  - {issue}")

## 8. Save Results and Generate Summary

In [None]:
# Compile final results
final_results = {
    'timestamp': datetime.now().isoformat(),
    'model_metadata': model_metadata,
    'training_history': training_history,
    'evaluation_metrics': evaluation_results.get('metrics', {}),
    'performance_metrics': performance_metrics,
    'deployment_assessment': deployment_assessment,
    'pipeline_summary': {
        'goals': [
            'Load trained model and evaluation data',
            'Perform comprehensive model validation',
            'Generate performance metrics and visualizations',
            'Analyze model predictions on test data',
            'Create deployment readiness assessment',
            'Generate final pipeline summary'
        ],
        'completed': []
    }
}

# Track completed goals
if trained_model is not None:
    final_results['pipeline_summary']['completed'].append('Load trained model and evaluation data')

if performance_metrics:
    final_results['pipeline_summary']['completed'].extend([
        'Perform comprehensive model validation',
        'Generate performance metrics and visualizations',
        'Analyze model predictions on test data'
    ])

if deployment_assessment:
    final_results['pipeline_summary']['completed'].append('Create deployment readiness assessment')

final_results['pipeline_summary']['completed'].append('Generate final pipeline summary')

# Save results
output_dir = Path(CONFIG['paths'].get('test_output_path', 'test_output'))
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / f'model_evaluation_summary_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'

with open(output_file, 'w') as f:
    json.dump(final_results, f, indent=2)

print(f"‚úÖ Evaluation complete! Results saved to: {output_file}")
print(f"\nüìã Pipeline Summary:")
print(f"  Goals completed: {len(final_results['pipeline_summary']['completed'])}/{len(final_results['pipeline_summary']['goals'])}")
for goal in final_results['pipeline_summary']['completed']:
    print(f"  ‚úì {goal}")

# Store results in global variable for next notebooks
EVALUATION_RESULTS = final_results
print("\n‚úì Results stored in EVALUATION_RESULTS variable for future use")