# Sentinel Model - Audio Distress Detection

Model architecture and training logic using MobileNetV2 for transfer learning on audio spectrograms.


In [1]:
# Imports
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import os
import json
import sys
from pathlib import Path

# Add project root and backend to Python path for imports
project_root = Path().resolve().parent
backend_path = project_root / "backend"
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(backend_path))

# Enable eager execution (required for TensorFlow/Keras 2.x+)
tf.config.run_functions_eagerly(True)


In [2]:
# Configuration
# Determine model directory based on where we're running from
_script_dir = Path(".")
if Path(".").absolute().name == "backend":
    MODEL_DIR = Path("models")
else:
    MODEL_DIR = Path("backend/models")

MODEL_DIR.mkdir(parents=True, exist_ok=True)
MODEL_PATH = MODEL_DIR / "sentinel_model.h5"
METADATA_PATH = MODEL_DIR / "model_metadata.json"

# Image input shape (MobileNetV2 default)
INPUT_SHAPE = (224, 224, 3)


## Model Architecture

Create MobileNetV2-based model for binary audio classification.


In [3]:
def create_model(input_shape=INPUT_SHAPE, num_classes=2, weights=None):
    """
    Create MobileNetV2-based model for binary audio classification.
    
    Args:
        input_shape: Input image shape (default: (224, 224, 3))
        num_classes: Number of output classes (default: 2 for binary)
        weights: Path to pretrained weights or None for random init
    
    Returns:
        Compiled Keras model
    """
    # Base MobileNetV2 (pretrained on ImageNet, excluding top)
    base_model = MobileNetV2(
        input_shape=input_shape,
        include_top=False,
        weights='imagenet' if weights is None else None,
        alpha=1.0
    )
    
    # Freeze base model initially (can be unfrozen during fine-tuning)
    base_model.trainable = False
    
    # Build custom classifier head
    inputs = keras.Input(shape=input_shape)
    
    # Preprocess for MobileNetV2
    x = base_model(inputs, training=False)
    
    # Global average pooling
    x = layers.GlobalAveragePooling2D()(x)
    
    # Dense layers
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    
    # Output layer (binary classification)
    if num_classes == 2:
        outputs = layers.Dense(1, activation='sigmoid', name='predictions')(x)
        loss = 'binary_crossentropy'
    else:
        outputs = layers.Dense(num_classes, activation='softmax', name='predictions')(x)
        loss = 'sparse_categorical_crossentropy'
    
    model = keras.Model(inputs, outputs, name='sentinel_mobilenet')
    
    # Compile model
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss=loss,
        metrics=['accuracy']
    )
    
    return model


In [4]:
def load_model(model_path=MODEL_PATH):
    """
    Load trained model from disk.
    
    Args:
        model_path: Path to saved model file
    
    Returns:
        Loaded Keras model, or None if file doesn't exist
    """
    if os.path.exists(model_path):
        try:
            model = keras.models.load_model(model_path)
            return model
        except Exception as e:
            print(f"Error loading model: {e}")
            return None
    return None


def save_model(model, model_path=MODEL_PATH, metadata=None):
    """
    Save model and optional metadata to disk.
    
    Args:
        model: Keras model to save
        model_path: Path to save model
        metadata: Optional dictionary of metadata to save
    """
    model.save(model_path)
    
    if metadata:
        with open(METADATA_PATH, 'w') as f:
            json.dump(metadata, f, indent=2)


In [5]:
def _prepare_data_from_directories(data_dir, validation_split=0.2):
    """
    Internal function: Prepare training data from directory structure:
    data_dir/
        safe/
            audio1.wav
            audio2.wav
        danger/
            audio1.wav
            audio2.wav
    
    Args:
        data_dir: Root directory containing class subdirectories
        validation_split: Fraction of data to use for validation
    
    Returns:
        train_generator, val_generator, num_samples
    """
    from backend.preprocessing import audio_file_to_image, image_to_array
    
    data_path = Path(data_dir)
    safe_dir = data_path / "safe"
    danger_dir = data_path / "danger"
    
    # Collect all audio files
    safe_files = list(Path(safe_dir).glob("*.wav")) if safe_dir.exists() else []
    danger_files = list(Path(danger_dir).glob("*.wav")) if danger_dir.exists() else []
    
    if len(safe_files) == 0 and len(danger_files) == 0:
        raise ValueError(f"No audio files found in {data_dir}")
    
    # Process audio files to images
    X = []
    y = []
    
    for file_path in safe_files:
        try:
            img = audio_file_to_image(file_path)
            img_array = image_to_array(img)
            X.append(img_array)
            y.append(0)  # Safe class
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            continue
    
    for file_path in danger_files:
        try:
            img = audio_file_to_image(file_path)
            img_array = image_to_array(img)
            X.append(img_array)
            y.append(1)  # Danger class
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            continue
    
    if len(X) == 0:
        raise ValueError("No valid audio files could be processed")
    
    # Convert to numpy arrays (ensure they're proper numpy arrays, not tensorflow tensors)
    X = np.array(X, dtype=np.float32)
    y = np.array(y, dtype=np.float32)
    
    # Shuffle data
    indices = np.random.permutation(len(X))
    X = X[indices]
    y = y[indices]
    
    # Split into train/validation
    split_idx = int(len(X) * (1 - validation_split))
    X_train, X_val = X[:split_idx], X[split_idx:]
    y_train, y_val = y[:split_idx], y[split_idx:]
    
    # Apply data augmentation
    datagen = ImageDataGenerator(
        rotation_range=5,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.1,
        horizontal_flip=False,  # Don't flip spectrograms
        fill_mode='nearest'
    )
    
    train_generator = datagen.flow(X_train, y_train, batch_size=32, shuffle=True)
    val_generator = datagen.flow(X_val, y_val, batch_size=32, shuffle=False)
    
    num_samples = len(X)
    
    return train_generator, val_generator, num_samples


## Model Training


In [6]:
def train_model(data_dir, epochs=10, batch_size=32, validation_split=0.2, 
                initial_epoch=0, model=None):
    """
    Train the Sentinel model on audio data.
    
    Args:
        data_dir: Directory containing safe/ and danger/ subdirectories
        epochs: Number of training epochs
        batch_size: Batch size for training
        validation_split: Fraction of data for validation
        initial_epoch: Starting epoch (for resuming training)
        model: Existing model to continue training, or None to create new
    
    Returns:
        Trained model and training history
    """
    # Load or create model
    if model is None:
        model = load_model()
        if model is None:
            print("Creating new model...")
            model = create_model()
    
    # Prepare data
    print(f"Loading data from {data_dir}...")
    train_gen, val_gen, num_samples = _prepare_data_from_directories(
        data_dir, validation_split=validation_split
    )
    
    print(f"Training on {num_samples} samples...")
    
    # Callbacks
    callbacks = [
        keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=5,
            restore_best_weights=True
        ),
        keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=3,
            min_lr=1e-7
        )
    ]
    
    # Train model
    history = model.fit(
        train_gen,
        epochs=epochs,
        validation_data=val_gen,
        callbacks=callbacks,
        initial_epoch=initial_epoch,
        verbose=1
    )
    
    # Save model
    metadata = {
        'epochs_trained': epochs,
        'total_samples': num_samples,
        'last_accuracy': float(history.history['accuracy'][-1]),
        'last_val_accuracy': float(history.history['val_accuracy'][-1])
    }
    
    save_model(model, metadata=metadata)
    
    print("Model training completed and saved!")
    
    return model, history


In [7]:
def predict(model, audio_path):
    """
    Make prediction on a single audio file.
    
    Args:
        model: Trained Keras model
        audio_path: Path to audio file or file-like object
    
    Returns:
        dict with 'prediction' (0 or 1), 'confidence' (float), 'probability' (float)
    """
    from backend.preprocessing import audio_file_to_image, image_to_array
    
    # Process audio
    img = audio_file_to_image(audio_path)
    img_array = image_to_array(img)
    
    # Add batch dimension
    img_batch = np.expand_dims(img_array, axis=0)
    
    # Predict
    prediction = model.predict(img_batch, verbose=0)[0][0]
    
    # Binary classification: 0 = Safe, 1 = Danger
    class_idx = 1 if prediction > 0.5 else 0
    confidence = abs(prediction - 0.5) * 2  # Convert to [0, 1] confidence
    
    return {
        'prediction': int(class_idx),
        'class': 'danger' if class_idx == 1 else 'safe',
        'confidence': float(confidence),
        'probability': float(prediction)  # Raw probability (0=Safe, 1=Danger)
    }


## Interactive Training

Now let's actually train the model with your data!


In [8]:
# Check data directory structure (look in backend/data)
from pathlib import Path

# Try backend/data first, then data
data_dir = Path("../backend/data") if Path("../backend/data").exists() else Path("data")
safe_dir = data_dir / "safe"
danger_dir = data_dir / "danger"

print(f"Data directory: {data_dir.absolute()}")
print(f"Safe directory exists: {safe_dir.exists()}")
print(f"Danger directory exists: {danger_dir.exists()}")

if safe_dir.exists():
    safe_files = list(safe_dir.glob("*.wav"))
    print(f"Safe audio files: {len(safe_files)}")
    
if danger_dir.exists():
    danger_files = list(danger_dir.glob("*.wav"))
    print(f"Danger audio files: {len(danger_files)}")


Data directory: c:\Users\USER\Documents\Sentinel-End-to-End-MLOps\notebook\..\backend\data
Safe directory exists: True
Danger directory exists: True
Safe audio files: 100
Danger audio files: 100


In [9]:
# Create or load model
print("Initializing model...")
model = load_model()
if model is None:
    print("Creating new model...")
    model = create_model()
    print("New model created!")
else:
    print("Loaded existing model from disk")
    
# Display model summary
model.summary()


Initializing model...
Creating new model...
New model created!


## Train the Model

Train the model on your preprocessed data. Adjust epochs, batch_size, and validation_split as needed.


In [10]:
# Train the model
# Adjust these parameters as needed:
EPOCHS = 10
BATCH_SIZE = 32
VALIDATION_SPLIT = 0.2
# Use backend/data if it exists, otherwise try data
DATA_DIR = "../backend/data" if Path("../backend/data").exists() else "data"

print("=" * 60)
print("Starting Model Training")
print("=" * 60)
print(f"Epochs: {EPOCHS}")
print(f"Batch size: {BATCH_SIZE}")
print(f"Validation split: {VALIDATION_SPLIT}")
print(f"Data directory: {DATA_DIR}")
print("=" * 60)

# Train the model
trained_model, training_history = train_model(
    data_dir=DATA_DIR,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_split=VALIDATION_SPLIT,
    model=model  # Use the model we created/loaded above
)

print("\nTraining completed!")


Starting Model Training
Epochs: 10
Batch size: 32
Validation split: 0.2
Data directory: ../backend/data
Loading data from ../backend/data...
Training on 200 samples...
Epoch 1/10




[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 4s/step - accuracy: 0.4625 - loss: 0.9610 - val_accuracy: 0.7250 - val_loss: 0.6035 - learning_rate: 0.0010
Epoch 2/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 4s/step - accuracy: 0.6438 - loss: 0.6331 - val_accuracy: 0.7000 - val_loss: 0.6121 - learning_rate: 0.0010
Epoch 3/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 4s/step - accuracy: 0.6625 - loss: 0.6220 - val_accuracy: 0.6500 - val_loss: 0.5422 - learning_rate: 0.0010
Epoch 4/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 4s/step - accuracy: 0.6812 - loss: 0.6248 - val_accuracy: 0.8000 - val_loss: 0.5324 - learning_rate: 0.0010
Epoch 5/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 4s/step - accuracy: 0.7188 - loss: 0.5914 - val_accuracy: 0.7750 - val_loss: 0.5284 - learning_rate: 0.0010
Epoch 6/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 4s/step - accuracy: 0.7188 



Model training completed and saved!

Training completed!


## Visualize Training Results

Plot training accuracy and loss over epochs.


In [11]:
# Plot training history
import matplotlib.pyplot as plt

# Check if training_history exists (may not if training failed)
if 'training_history' in globals():
    history = training_history.history
else:
    print("⚠️ Error: training_history not found. Please run the training cell first.")
    print("   If training failed, check the error messages above.")
    raise NameError("training_history is not defined. Run the training cell first.")

# Create figure with subplots
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Plot accuracy
ax1.plot(history['accuracy'], label='Training Accuracy', marker='o')
ax1.plot(history['val_accuracy'], label='Validation Accuracy', marker='s')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Accuracy')
ax1.set_title('Model Accuracy')
ax1.legend()
ax1.grid(True)

# Plot loss
ax2.plot(history['loss'], label='Training Loss', marker='o')
ax2.plot(history['val_loss'], label='Validation Loss', marker='s')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Loss')
ax2.set_title('Model Loss')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

# Print final metrics
print(f"\nFinal Training Accuracy: {history['accuracy'][-1]:.4f}")
print(f"Final Validation Accuracy: {history['val_accuracy'][-1]:.4f}")
print(f"Final Training Loss: {history['loss'][-1]:.4f}")
print(f"Final Validation Loss: {history['val_loss'][-1]:.4f}")



Final Training Accuracy: 0.7688
Final Validation Accuracy: 0.8250
Final Training Loss: 0.4649
Final Validation Loss: 0.4167


  plt.show()


## Test Predictions

Test the trained model on sample audio files.


In [12]:
# Test prediction on a sample file
from pathlib import Path
import random

# Get sample files
data_dir = Path("../backend/data") if Path("../backend/data").exists() else Path("data")
safe_files = list((data_dir / "safe").glob("*.wav")) if (data_dir / "safe").exists() else []
danger_files = list((data_dir / "danger").glob("*.wav")) if (data_dir / "danger").exists() else []

# Test on a random safe file
if safe_files:
    test_safe = random.choice(safe_files)
    print(f"Testing on SAFE file: {test_safe.name}")
    result = predict(trained_model, test_safe)
    print(f"Prediction: {result['class'].upper()}")
    print(f"Confidence: {result['confidence']:.2%}")
    print(f"Probability: {result['probability']:.4f}")
    print(f"Expected: SAFE | Got: {result['class'].upper()} | {'✓' if result['class'] == 'safe' else '✗'}")

# Test on a random danger file
if danger_files:
    test_danger = random.choice(danger_files)
    print(f"\nTesting on DANGER file: {test_danger.name}")
    result = predict(trained_model, test_danger)
    print(f"Prediction: {result['class'].upper()}")
    print(f"Confidence: {result['confidence']:.2%}")
    print(f"Probability: {result['probability']:.4f}")
    print(f"Expected: DANGER | Got: {result['class'].upper()} | {'✓' if result['class'] == 'danger' else '✗'}")


Testing on SAFE file: xlE_HYdFR-I_out.wav
Prediction: SAFE
Confidence: 48.31%
Probability: 0.2585
Expected: SAFE | Got: SAFE | ✓

Testing on DANGER file: rMAvJseEE0I_out.wav
Prediction: DANGER
Confidence: 70.72%
Probability: 0.8536
Expected: DANGER | Got: DANGER | ✓


In [13]:
## Advanced Model Evaluation Metrics (ROC and Precision-Recall Curves)

# This cell should be run AFTER the Model Evaluation cell that defines y_test and predictions
# Check if variables exist from previous cell
if 'y_test' in globals() and 'predictions' in globals():
    from sklearn.metrics import roc_curve, auc, roc_auc_score, precision_recall_curve, average_precision_score
    import matplotlib.pyplot as plt
    
    # Flatten predictions for ROC curve (predictions is shape (n, 1))
    predictions_flat = predictions.flatten() if len(predictions.shape) > 1 else predictions
    
    # Calculate ROC curve
    fpr, tpr, _ = roc_curve(y_test, predictions_flat)
    roc_auc = roc_auc_score(y_test, predictions_flat)
    
    # Plot ROC Curve
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random Classifier')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate', fontsize=12)
    plt.ylabel('True Positive Rate', fontsize=12)
    plt.title('Receiver Operating Characteristic (ROC) Curve', fontsize=14, fontweight='bold')
    plt.legend(loc="lower right", fontsize=11)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
    
    print(f"\nROC AUC Score: {roc_auc:.4f}")
    
    # Precision-Recall Curve
    precision, recall, _ = precision_recall_curve(y_test, predictions_flat)
    avg_precision = average_precision_score(y_test, predictions_flat)
    
    # Plot Precision-Recall Curve
    plt.figure(figsize=(8, 6))
    plt.plot(recall, precision, color='blue', lw=2, 
             label=f'Precision-Recall curve (AP = {avg_precision:.2f})')
    plt.xlabel('Recall', fontsize=12)
    plt.ylabel('Precision', fontsize=12)
    plt.title('Precision-Recall Curve', fontsize=14, fontweight='bold')
    plt.legend(loc="lower left", fontsize=11)
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
    
    print(f"Average Precision Score: {avg_precision:.4f}")
else:
    print("⚠️ Error: Please run the Model Evaluation cell first to define y_test and predictions variables.")

⚠️ Error: Please run the Model Evaluation cell first to define y_test and predictions variables.


## Model Evaluation

Evaluate the model on all validation data to get comprehensive metrics.


In [14]:
# Evaluate model on test data
from backend.preprocessing import audio_file_to_image, image_to_array
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix

# Prepare test data (you can adjust this to use a separate test set)
test_data_dir = Path("../backend/data") if Path("../backend/data").exists() else Path("data")
safe_test = list((test_data_dir / "safe").glob("*.wav"))[:20] if (test_data_dir / "safe").exists() else []  # Test on subset
danger_test = list((test_data_dir / "danger").glob("*.wav"))[:20] if (test_data_dir / "danger").exists() else []

X_test = []
y_test = []

print("Preparing test data...")
for file in safe_test:
    try:
        img = audio_file_to_image(file)
        img_array = image_to_array(img)
        X_test.append(img_array)
        y_test.append(0)  # Safe
    except Exception as e:
        print(f"Error processing {file}: {e}")

for file in danger_test:
    try:
        img = audio_file_to_image(file)
        img_array = image_to_array(img)
        X_test.append(img_array)
        y_test.append(1)  # Danger
    except Exception as e:
        print(f"Error processing {file}: {e}")

if X_test:
    X_test = np.array(X_test)
    y_test = np.array(y_test)
    
    # Make predictions
    predictions = trained_model.predict(X_test, verbose=0)
    y_pred = (predictions > 0.5).astype(int).flatten()
    
    # Print metrics
    print("\n" + "=" * 60)
    print("Classification Report")
    print("=" * 60)
    print(classification_report(y_test, y_pred, target_names=['Safe', 'Danger']))
    
    print("\nConfusion Matrix")
    print("=" * 60)
    cm = confusion_matrix(y_test, y_pred)
    print(cm)
    print("\nTrue Negatives (Safe→Safe):", cm[0][0])
    print("False Positives (Safe→Danger):", cm[0][1])
    print("False Negatives (Danger→Safe):", cm[1][0])
    print("True Positives (Danger→Danger):", cm[1][1])
    
    # Store predictions for ROC curve analysis (flatten for compatibility)
    predictions = predictions.flatten() if len(predictions.shape) > 1 else predictions
else:
    print("No test data available")


Preparing test data...

Classification Report
              precision    recall  f1-score   support

        Safe       0.75      0.75      0.75        20
      Danger       0.75      0.75      0.75        20

    accuracy                           0.75        40
   macro avg       0.75      0.75      0.75        40
weighted avg       0.75      0.75      0.75        40


Confusion Matrix
[[15  5]
 [ 5 15]]

True Negatives (Safe→Safe): 15
False Positives (Safe→Danger): 5
False Negatives (Danger→Safe): 5
True Positives (Danger→Danger): 15


## Save and Use Model

The model is automatically saved during training. You can also manually save it here.


In [15]:
# The model is already saved by train_model(), but you can save again with custom metadata
metadata = {
    'training_completed': True,
    'epochs': EPOCHS,
    'final_accuracy': float(training_history.history['accuracy'][-1]),
    'final_val_accuracy': float(training_history.history['val_accuracy'][-1]),
}

save_model(trained_model, metadata=metadata)
print(f"Model saved to: {MODEL_PATH}")
print(f"Metadata saved to: {METADATA_PATH}")

# Verify model can be loaded
print("\nVerifying model can be loaded...")
loaded_model = load_model()
if loaded_model:
    print("✓ Model loaded successfully!")
    print(f"Model parameters: {loaded_model.count_params():,}")
else:
    print("✗ Failed to load model")




Model saved to: backend\models\sentinel_model.h5
Metadata saved to: backend\models\model_metadata.json

Verifying model can be loaded...




✓ Model loaded successfully!
Model parameters: 2,430,273
