In [None]:
%pip install -r ../requirements.txt

In [2]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import librosa
import pandas as pd
from sklearn.model_selection import train_test_split
from scipy import signal
import matplotlib.pyplot as plt
from qbstyles import mpl_style

mpl_style()

# Constants
SPEED_OF_SOUND = 343.0  # m/s
MIC_POSITIONS = np.array([  # Hexagonal array (6 mics, 3D coordinates)
    [0.0, 0.0, 0.0],      # Mic 1 (center)
    [1.5, 0.0, 0.0],      # Mic 2 (right)
    [0.75, 1.299, 0.0],   # Mic 3 (top-right)
    [-0.75, 1.299, 0.0],  # Mic 4 (top-left)
    [-1.5, 0.0, 0.0],     # Mic 5 (left)
    [-0.75, -1.299, 0.0]  # Mic 6 (bottom-left)
]).T  # Shape: (3, 6)

DATA_DIR = "../data/simulations/"

#### Data loading & pre-processing

In [3]:
def load_waveforms(sim_dir, max_length=44100):
    """Load and pad/truncate 6-mic waveforms to max_length."""
    recordings = []
    for mic in range(1, 7):
        audio, _ = librosa.load(f"{sim_dir}/mic_{mic}_recording.wav", sr=None)
        if len(audio) > max_length:
            audio = audio[:max_length]
        else:
            audio = np.pad(audio, (0, max_length - len(audio)))
        recordings.append(audio)
    return np.stack(recordings, axis=1)  # Shape: (max_length, 6)

# Load all simulations
labels = pd.read_csv(f"{DATA_DIR}/labels.csv")
X = np.array([load_waveforms(f"{DATA_DIR}/gunshot_{i}") for i in range(len(labels))])
y = labels[["distance", "azimuth", "elevation"]].values

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}")

Train shape: (80, 44100, 6), Test shape: (20, 44100, 6)


#### Physics Informed Loss function

In [4]:
def physics_loss(y_true, y_pred):
    """Penalize deviations from TDoA physics."""
    # Predicted polar coordinates
    distance, azimuth, elevation = y_pred[:, 0], y_pred[:, 1], y_pred[:, 2]
    
    # Convert to Cartesian (relative to Mic1)
    x = distance * np.cos(azimuth) * np.cos(elevation)
    y = distance * np.sin(azimuth) * np.cos(elevation)
    z = distance * np.sin(elevation)
    source_pos = tf.stack([x, y, z], axis=1)  # Shape: (batch, 3)
    
    # Calculate expected TDoA (mic1 as reference)
    mic1_pos = tf.constant(MIC_POSITIONS[:, 0], dtype=tf.float32)
    mic_positions = tf.constant(MIC_POSITIONS, dtype=tf.float32)  # (3, 6)
    distances = tf.norm(mic_positions - tf.expand_dims(source_pos, 2), axis=1)  # (batch, 6)
    tdoa_pred = (distances - tf.expand_dims(distances[:, 0], 1)) / SPEED_OF_SOUND  # (batch, 6)
    
    # Ground truth TDoA (from waveforms)
    tdoa_true = tf.py_function(
        lambda x: np.array([[
            np.argmax(signal.correlate(x[i, :, 0], x[i, :, j])) - len(x[i, :, 0])
            for j in range(6)] for i in range(x.shape[0])]) / SAMPLE_RATE,
        [X], tf.float32
    )
    
    return tf.reduce_mean((tdoa_true[:, 1:] - tdoa_pred[:, 1:])**2)  # Skip mic1

#### PINN model architecture

In [6]:
def build_pinn(input_shape):
    """Waveform-based PINN for gunshot localization."""
    model = models.Sequential([
        layers.Input(shape=input_shape),  # (None, 6)
        layers.Conv1D(64, 15, activation='relu'),
        layers.MaxPooling1D(4),
        layers.Conv1D(128, 7, activation='relu'),
        layers.GlobalAveragePooling1D(),
        layers.Dense(128, activation='relu'),
        layers.Dense(3)  # distance, azimuth, elevation
    ])
    return model

model = build_pinn((X_train.shape[1], 6))
model.compile(
    optimizer=tf.keras.optimizers.Adam(0.001),
    loss='mse',
    metrics=['mae']
)
model.summary()

#### Custom training loop

In [None]:
class PINNTrainer(tf.keras.Model):
    def train_step(self, data):
        x, y = data
        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)
            mse_loss = self.compiled_loss(y, y_pred)
            phys_loss = physics_loss(y, y_pred)
            total_loss = mse_loss + 0.1 * phys_loss  # Weighted sum
        grads = tape.gradient(total_loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        return {"loss": total_loss, "mse": mse_loss, "physics_loss": phys_loss}

trainer = PINNTrainer(model)
history = trainer.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=50,
    batch_size=32
)

#### Evaluation metrics

In [None]:
def evaluate_model(model, X_test, y_test):
    # Predictions
    y_pred = model.predict(X_test)
    
    # Distance error (meters)
    distance_error = np.abs(y_pred[:, 0] - y_test[:, 0])
    
    # Angular error (degrees)
    azimuth_error = np.degrees(np.abs(y_pred[:, 1] - y_test[:, 1]))
    elevation_error = np.degrees(np.abs(y_pred[:, 2] - y_test[:, 2]))
    
    # 3D position error (meters)
    def polar_to_cartesian(r, theta, phi):
        x = r * np.cos(theta) * np.cos(phi)
        y = r * np.sin(theta) * np.cos(phi)
        z = r * np.sin(phi)
        return np.stack([x, y, z], axis=1)
    
    pos_true = polar_to_cartesian(y_test[:, 0], y_test[:, 1], y_test[:, 2])
    pos_pred = polar_to_cartesian(y_pred[:, 0], y_pred[:, 1], y_pred[:, 2])
    position_error = np.linalg.norm(pos_true - pos_pred, axis=1)
    
    print(f"Mean Distance Error: {np.mean(distance_error):.2f} m")
    print(f"Mean Azimuth Error: {np.mean(azimuth_error):.2f}°")
    print(f"Mean Elevation Error: {np.mean(elevation_error):.2f}°")
    print(f"Mean 3D Position Error: {np.mean(position_error):.2f} m")

evaluate_model(model, X_test, y_test)

In [None]:
# Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['mse'], label='Train MSE')
plt.plot(history.history['val_mse'], label='Val MSE')
plt.xlabel('Epoch'); plt.ylabel('MSE'); plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['physics_loss'], label='Physics Loss')
plt.xlabel('Epoch'); plt.ylabel('Loss'); plt.legend()