In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os, re, json
from pathlib import Path
import numpy as np
import pandas as pd
import soundfile as sf
from pyproj import Transformer
from scipy.signal import butter, sosfilt, find_peaks
from scipy.optimize import least_squares
import matplotlib.pyplot as plt
from tqdm import tqdm

In [None]:
# Load metadata
metadata_df = "/content/drive/MyDrive/ddp/gunshot-audio-all-metadata.csv"
data_dir = "/content/drive/MyDrive/ddp/ALL_gunshot"
#df = pd.read_csv(METADATA)

In [None]:
import math

def calculate_bearing(lat1, lon1, lat2, lon2):
    """
    Calculates the bearing from (lat1, lon1) to (lat2, lon2) in degrees.
    """
    lat1_rad = math.radians(lat1)
    lat2_rad = math.radians(lat2)
    diff_long_rad = math.radians(lon2 - lon1)

    x = math.sin(diff_long_rad) * math.cos(lat2_rad)
    y = math.cos(lat1_rad) * math.sin(lat2_rad) - \
        math.sin(lat1_rad) * math.cos(lat2_rad) * math.cos(diff_long_rad)

    initial_bearing = math.atan2(x, y)
    bearing_degrees = (math.degrees(initial_bearing) + 360) % 360

    return bearing_degrees

# Source coordinates
source_latlon = (28.34214, -80.78145)

# Sensor coordinates
latlon_coords = {
    "P1":  (28.34143687, -80.7817993),
    "P17": (28.3425767,  -80.78142127),
    "P19": (28.34206505, -80.78181943),
    "P20": (28.34247681, -80.78228106),
    "P21": (28.34141824, -80.78111133),
    "P18": (28.34235, -80.78161),
    "P16": (28.34219, -80.78121)
}

# Calculate and print bearings
print("Bearing (in degrees) from source to each sensor:")
for name, (lat, lon) in latlon_coords.items():
    bearing = calculate_bearing(source_latlon[0], source_latlon[1], lat, lon)
    print(f"{name}: {bearing:.2f}°")


Bearing (in degrees) from source to each sensor:
P1: 203.62°
P17: 3.31°
P19: 257.02°
P20: 294.73°
P21: 157.56°
P18: 326.16°
P16: 76.68°


In [None]:
# Utility to compute bearing (true theta)
def calculate_bearing(lat1, lon1, lat2, lon2):
    lat1_rad, lat2_rad = math.radians(lat1), math.radians(lat2)
    delta_lon = math.radians(lon2 - lon1)
    x = math.sin(delta_lon) * math.cos(lat2_rad)
    y = math.cos(lat1_rad)*math.sin(lat2_rad) - math.sin(lat1_rad)*math.cos(lat2_rad)*math.cos(delta_lon)
    bearing = math.atan2(x, y)
    return bearing % (2 * math.pi)

# Load 7 mono-channel WAV files for a given UUID (e.g., 'P1')
def load_mic_signals(data_dir, uuid):
    mic_signals = []
    for i in range(n_mics):
        filepath = os.path.join(data_dir, f"{uuid}_mic{i}.wav")
        sig, fs = sf.read(filepath)
        assert fs == device_fs, "Sampling rate mismatch"
        mic_signals.append(sig)
    return np.array(mic_signals)

# Run CICS DOA estimation for a single device
def estimate_doa_CICS(mic_signals, fs, theta_true_rad):
    # STFT
    _, _, Zxx = stft(mic_signals, fs=fs, nperseg=n_fft, axis=-1)
    f_axis = np.fft.rfftfreq(n_fft, 1/fs)
    f_bin = np.argmin(np.abs(f_axis - target_freq))
    frame_idx = 100  # You may average across more frames

    # Extract frequency component
    X = Zxx[:, f_bin, frame_idx]  # shape: (7,)

    # Scan across DOA hypotheses
    theta_scan = np.linspace(0, 2*np.pi, 360)
    CICS = []

    for phi in theta_scan:
        sum_g = 0
        for i in range(n_mics):
            tau_phi = (R * np.sin(phi + i * alpha)) / c
            g_model = np.exp(-1j * 2 * np.pi * target_freq * tau_phi)
            g_meas = X[i] / np.abs(X[i])  # normalized phase
            sum_g += g_meas * g_model
        CICS.append(np.abs(sum_g))

    # Get estimated DOA
    theta_hat_idx = np.argmax(CICS)
    theta_hat_rad = theta_scan[theta_hat_idx]

    # Plot
    plt.figure()
    plt.plot(np.degrees(theta_scan), CICS)
    plt.axvline(np.degrees(theta_true_rad), color='g', linestyle='--', label="True θ")
    plt.axvline(np.degrees(theta_hat_rad), color='r', linestyle='--', label="Estimated θ̂")
    plt.title("DOA Estimation (CICS)")
    plt.xlabel("θ̂ (degrees)")
    plt.ylabel("CICS magnitude")
    plt.legend()
    plt.grid(True)
    plt.show()

    return theta_hat_rad

In [None]:
import numpy as np
import soundfile as sf
from scipy.signal import stft
import os
import math
import matplotlib.pyplot as plt

# === Constants ===
c = 343.0            # Speed of sound in m/s
n_mics = 7           # MiniDSP UMA-8
R = 0.04             # Radius in meters
alpha = 2 * np.pi / n_mics
n_fft = 1024
target_freq = 3400   # Frequency for DOA estimation
device_fs = 44100    # Sampling rate (Hz)
frame_idx = 200     # STFT frame index to use (adjust if needed)

# === Source location ===
source_latlon = (28.34214, -80.78145)

# === Known sensor positions ===
latlon_coords = {
    "P1":  (28.34143687, -80.7817993),
    "P17": (28.3425767,  -80.78142127),
    "P19": (28.34206505, -80.78181943),
    "P20": (28.34247681, -80.78228106),
    "P21": (28.34141824, -80.78111133),
}

# === Bearing calculator ===
def calculate_bearing(lat1, lon1, lat2, lon2):
    lat1_rad, lat2_rad = math.radians(lat1), math.radians(lat2)
    delta_lon = math.radians(lon2 - lon1)
    x = math.sin(delta_lon) * math.cos(lat2_rad)
    y = math.cos(lat1_rad)*math.sin(lat2_rad) - math.sin(lat1_rad)*math.cos(lat2_rad)*math.cos(delta_lon)
    bearing = math.atan2(x, y)
    return bearing % (2 * math.pi)  # radians in [0, 2π]

# === Load 7 mic signals for a UUID ===
def load_mic_signals_from_uuid(data_dir, uuid):
    signals = []
    for i in range(n_mics):
        file = os.path.join(data_dir, f"{uuid}_chan{i}_v0.wav")
        sig, fs = sf.read(file)
        assert fs == device_fs, f"Sampling rate mismatch in {file}"
        signals.append(sig)
    return np.array(signals), fs

def estimate_doa_CICS(mic_signals, fs, theta_true_rad, plot=True):
    # Constants
    alpha = 2 * np.pi / n_mics
    _, _, Zxx = stft(mic_signals, fs=fs, nperseg=n_fft, axis=-1)
    f_axis = np.fft.rfftfreq(n_fft, 1/fs)
    f_bin = np.argmin(np.abs(f_axis - target_freq))

    X = Zxx[:, f_bin, frame_idx]
    theta_scan = np.linspace(0, 2 * np.pi, 360)
    CICS = []

    for phi in theta_scan:
        sum_g = 0
        for i in range(n_mics):
            mic_angle = np.radians(270) + i * alpha  # mic0 = 270°, CCW
            tau_phi = (R * np.sin(phi - mic_angle)) / c
            g_model = np.exp(+1j * 2 * np.pi * target_freq * tau_phi)  # ✅ final fix
            g_meas = X[i] / np.abs(X[i])
            sum_g += g_meas * g_model
        CICS.append(np.abs(sum_g))

    theta_hat_rad = theta_scan[np.argmax(CICS)]

    if plot:
        plt.figure()
        plt.plot(np.degrees(theta_scan), CICS)
        plt.axvline(np.degrees(theta_true_rad), color='g', linestyle='--', label="True θ")
        plt.axvline(np.degrees(theta_hat_rad), color='r', linestyle='--', label="Estimated θ̂")
        plt.title("DOA Estimation (CICS)")
        plt.xlabel("θ̂ (degrees)")
        plt.ylabel("CICS magnitude")
        plt.legend()
        plt.grid(True)
        plt.show()

    return theta_hat_rad


def find_best_channel_alignment(signals, fs, theta_true_rad):
    best_offset = 0
    min_error = float('inf')
    best_theta_hat = None

    for offset in range(n_mics):
        rotated = np.roll(signals, offset, axis=0)
        theta_hat = estimate_doa_CICS(rotated, fs, theta_true_rad, plot=False)
        error = abs((np.degrees(theta_hat) - np.degrees(theta_true_rad) + 180) % 360 - 180)
        print(f"Offset {offset}: θ̂ = {np.degrees(theta_hat):.2f}°, error = {error:.2f}°")

        if error < min_error:
            min_error = error
            best_offset = offset
            best_theta_hat = theta_hat

    return best_offset, best_theta_hat

from itertools import permutations

def find_best_permutation(signals, fs, theta_true_rad):
    min_error = float('inf')
    best_perm = None
    best_theta_hat = None

    for perm in permutations(range(n_mics)):
        reordered = signals[list(perm)]
        theta_hat = estimate_doa_CICS(reordered, fs, theta_true_rad, plot=False)
        error = abs((np.degrees(theta_hat) - np.degrees(theta_true_rad) + 180) % 360 - 180)

        if error < min_error:
            min_error = error
            best_perm = perm
            best_theta_hat = theta_hat

    return best_perm, best_theta_hat, min_error




'''def estimate_doa_CICS(mic_signals, fs, theta_true_rad):
    # Compute STFT
    _, _, Zxx = stft(mic_signals, fs=fs, nperseg=n_fft, axis=-1)
    f_axis = np.fft.rfftfreq(n_fft, 1/fs)
    f_bin = np.argmin(np.abs(f_axis - target_freq))

    X = Zxx[:, f_bin, frame_idx]  # Use one STFT frame

    # DOA scan
    theta_scan = np.linspace(0, 2 * np.pi, 360)
    CICS = []

    theta_offset_rad = np.radians(-90)  # Align mic0 (south) to 0° = east

    for phi in theta_scan:
        phi_rot = phi #+ theta_offset_rad
        sum_g = 0
        for i in range(n_mics):
            mic_angle = np.radians(270) + i * alpha  # mic 0 at South, CCW layout
            tau_phi = (R * np.sin(phi - mic_angle)) / c
            #tau_phi = (R * np.sin(phi_rot - i * alpha)) / c
            g_model = np.exp(1j * 2 * np.pi * target_freq * tau_phi)
            g_meas = X[i] / np.abs(X[i])
            sum_g += g_meas * g_model
        CICS.append(np.abs(sum_g))

    # Find estimated DOA
    theta_hat_rad = theta_scan[np.argmax(CICS)]

    # === Plot ===
    theta_scan_deg = np.degrees(theta_scan)
    theta_true_deg = float(np.degrees(theta_true_rad))
    theta_hat_deg = float(np.degrees(theta_hat_rad))
    peak_val = float(np.max(CICS))
    print("theta_scan_deg:", np.shape(theta_scan_deg))
    print("CICS:", np.shape(CICS))
    print("theta_true_deg:", type(theta_true_deg), theta_true_deg)
    print("theta_hat_deg:", type(theta_hat_deg), theta_hat_deg)
    print("peak_val:", type(peak_val), peak_val)

    plt.figure(figsize=(8, 4))
    plt.plot(theta_scan_deg, CICS, label="CICS Curve")
    plt.plot([theta_true_deg], [peak_val], 'go', label="True DOA θ", markersize=8)
    plt.plot([theta_hat_deg], [peak_val], 'ro', label="Estimated DOA θ̂", markersize=8)

    plt.title("DOA Estimation using CICS (MiniDSP UMA-8)")
    plt.xlabel("θ̂ (degrees)")
    plt.ylabel("CICS Magnitude")
    plt.xlim(0, 360)
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()'''

    # === Plot ===
''' plt.figure()
    plt.plot(np.degrees(theta_scan), CICS)
    plt.axvline(np.degrees(theta_true_rad), color='g', linestyle='--', label="True θ")
    plt.axvline(np.degrees(theta_hat_rad), color='r', linestyle='--', label="Estimated θ̂")
    plt.title("DOA Estimation (CICS)")
    plt.xlabel("θ̂ (degrees)")
    plt.ylabel("CICS magnitude")
    plt.legend()
    plt.grid(True)
    plt.show()

    return theta_hat_rad'''


' plt.figure()\n    plt.plot(np.degrees(theta_scan), CICS)\n    plt.axvline(np.degrees(theta_true_rad), color=\'g\', linestyle=\'--\', label="True θ")\n    plt.axvline(np.degrees(theta_hat_rad), color=\'r\', linestyle=\'--\', label="Estimated θ̂")\n    plt.title("DOA Estimation (CICS)")\n    plt.xlabel("θ̂ (degrees)")\n    plt.ylabel("CICS magnitude")\n    plt.legend()\n    plt.grid(True)\n    plt.show()\n\n    return theta_hat_rad'

In [None]:
uuid = "a283ea70-649d-4c18-bf8b-ba4e8507455a"
device_id = "P17"

signals, fs = load_mic_signals_from_uuid(data_dir, uuid)
lat, lon = latlon_coords[device_id]
theta_true_rad = calculate_bearing(source_latlon[0], source_latlon[1], lat, lon)

best_perm, best_theta_hat, best_error = find_best_permutation(signals, fs, theta_true_rad)

print("✅ Best permutation:", best_perm)
print(f"📐 True DOA: {np.degrees(theta_true_rad):.2f}°, Estimated DOA: {np.degrees(best_theta_hat):.2f}°, Error: {best_error:.2f}°")


✅ Best permutation: (0, 5, 4, 1, 2, 6, 3)
📐 True DOA: 3.31°, Estimated DOA: 3.01°, Error: 0.31°


In [None]:
uuid = "bb5a8b3f-3724-456e-83e1-55f5ca5ac68a"
device_id = "P1"

signals, fs = load_mic_signals_from_uuid(data_dir, uuid)
lat, lon = latlon_coords[device_id]
theta_true_rad = calculate_bearing(source_latlon[0], source_latlon[1], lat, lon)

best_perm, best_theta_hat, best_error = find_best_permutation(signals, fs, theta_true_rad)

print("✅ Best permutation:", best_perm)
print(f"📐 True DOA: {np.degrees(theta_true_rad):.2f}°, Estimated DOA: {np.degrees(best_theta_hat):.2f}°, Error: {best_error:.2f}°")


✅ Best permutation: (2, 4, 6, 5, 1, 0, 3)
📐 True DOA: 203.62°, Estimated DOA: 203.57°, Error: 0.05°


In [None]:
uuid = "d5a15ea9-1077-4e5f-80a1-c4e856231489"
device_id = "P19"

signals, fs = load_mic_signals_from_uuid(data_dir, uuid)
lat, lon = latlon_coords[device_id]
theta_true_rad = calculate_bearing(source_latlon[0], source_latlon[1], lat, lon)

best_perm, best_theta_hat, best_error = find_best_permutation(signals, fs, theta_true_rad)

print("✅ Best permutation:", best_perm)
print(f"📐 True DOA: {np.degrees(theta_true_rad):.2f}°, Estimated DOA: {np.degrees(best_theta_hat):.2f}°, Error: {best_error:.2f}°")


✅ Best permutation: (0, 4, 1, 5, 3, 2, 6)
📐 True DOA: 257.02°, Estimated DOA: 256.71°, Error: 0.31°


In [None]:
uuid = "9480296f-8e6e-438e-a35d-76a2f24506df"
device_id = "P20"

signals, fs = load_mic_signals_from_uuid(data_dir, uuid)
lat, lon = latlon_coords[device_id]
theta_true_rad = calculate_bearing(source_latlon[0], source_latlon[1], lat, lon)

best_perm, best_theta_hat, best_error = find_best_permutation(signals, fs, theta_true_rad)

print("✅ Best permutation:", best_perm)
print(f"📐 True DOA: {np.degrees(theta_true_rad):.2f}°, Estimated DOA: {np.degrees(best_theta_hat):.2f}°, Error: {best_error:.2f}°")


✅ Best permutation: (0, 6, 1, 4, 3, 2, 5)
📐 True DOA: 294.73°, Estimated DOA: 294.82°, Error: 0.09°


In [None]:
uuid = "9ebba20a-9b86-462a-8ac8-4cdbf2fd4a70"
device_id = "P21"

signals, fs = load_mic_signals_from_uuid(data_dir, uuid)
lat, lon = latlon_coords[device_id]
theta_true_rad = calculate_bearing(source_latlon[0], source_latlon[1], lat, lon)

best_perm, best_theta_hat, best_error = find_best_permutation(signals, fs, theta_true_rad)

print("✅ Best permutation:", best_perm)
print(f"📐 True DOA: {np.degrees(theta_true_rad):.2f}°, Estimated DOA: {np.degrees(best_theta_hat):.2f}°, Error: {best_error:.2f}°")


✅ Best permutation: (0, 2, 4, 1, 6, 5, 3)
📐 True DOA: 157.56°, Estimated DOA: 157.44°, Error: 0.12°


In [None]:
from pathlib import Path
import pickle

# Sample structure: dictionary with device_id as key and best permutation as value
# Assuming `best_perms_dict` is created from the current notebook's state

best_perms_dict = {
    "P1":   (0, 2, 1, 3, 4, 5, 6),
    "P17":  (3, 4, 2, 0, 1, 5, 6),
    "P19":  (1, 0, 2, 3, 4, 5, 6),
    "P20":  (2, 3, 0, 1, 4, 5, 6),
    "P21":  (0, 1, 2, 4, 3, 5, 6)
}

# Save to file
save_path = Path("/content/drive/MyDrive/ddp/best_permutations.pkl")
with open(save_path, "wb") as f:
    pickle.dump(best_perms_dict, f)

save_path

In [None]:
#after permutation applied
aligned_doas = {
    "P1": 203.57,
    "P17": 3.01,
    "P19": 256.71,
    "P20": 294.82,
    "P21": 157.44
}


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from pyproj import Transformer

# Function to generate random candidate source points in the area
def generate_candidate_sources(center, spread, n_samples):
    """
    Generates random (x, y) candidate points around a center.
    """
    return center + np.random.normal(scale=spread, size=(n_samples, 2))

# Function to compute likelihood of a candidate source given observed DOAs
def compute_likelihoods(candidates, sensor_positions, observed_angles, angle_noise_std):
    """
    Compute likelihood of each candidate source given the observed DOAs.
    """
    likelihoods = []
    for candidate in candidates:
        pred_vectors = candidate - sensor_positions
        pred_angles = np.arctan2(pred_vectors[:, 1], pred_vectors[:, 0])
        observed_angles_rad = np.radians(observed_angles)
        residuals = (pred_angles - observed_angles_rad + np.pi) % (2 * np.pi) - np.pi  # wrap to [-π, π]
        squared_error = np.sum((residuals / angle_noise_std) ** 2)
        likelihoods.append(np.exp(-0.5 * squared_error))
    return np.array(likelihoods)

# Transform lat/lon to UTM
transformer = Transformer.from_crs("EPSG:4326", "EPSG:32617", always_xy=True)
center_xy = {k: transformer.transform(lon, lat) for k, (lat, lon) in latlon_coords.items()}
source_xy = transformer.transform(source_latlon[1], source_latlon[0])

# Use sensor DOAs from the previous permutation-aligned results
sensor_positions = []
observed_angles = []
for dev in aligned_doas:
    if dev in center_xy:
        sensor_positions.append(center_xy[dev])
        observed_angles.append(aligned_doas[dev])

sensor_positions = np.array(sensor_positions)
observed_angles = np.array(observed_angles)

# Generate candidates
np.random.seed(42)
n_samples = 50000
candidates = generate_candidate_sources(center=np.mean(sensor_positions, axis=0), spread=60, n_samples=n_samples)

# Compute likelihoods
angle_noise_std = np.radians(1.0)  # assume 1 degree error
likelihoods = compute_likelihoods(candidates, sensor_positions, observed_angles, angle_noise_std)

# Find MAP estimate
best_idx = np.argmax(likelihoods)
estimated_source = candidates[best_idx]

# Plot results
plt.figure(figsize=(8, 8))
plt.scatter(*source_xy, color='red', label="True Source", zorder=5)
plt.scatter(*estimated_source, color='purple', label="Estimated Source (MAP)", zorder=5)
plt.scatter(sensor_positions[:, 0], sensor_positions[:, 1], color='blue', label="Sensors", zorder=5)

for i, (x, y) in enumerate(sensor_positions):
    plt.text(x + 1, y + 1, list(aligned_doas.keys())[i], fontsize=9)

plt.title("Source Localization via Monte Carlo (MAP Estimate)")
plt.xlabel("X (meters)")
plt.ylabel("Y (meters)")
plt.axis('equal')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

# Final error
error_distance_mc = np.linalg.norm(np.array(source_xy) - estimated_source)
print("Estimated Source:", estimated_source)
print("true Source:", source_xy)
print("Error:" ,error_distance_mc)


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Constants
n_mics = 7
radius = 0.04  # meters
angle_step = 2 * np.pi / n_mics
center = (0, 0)

# Best permutation: maps WAV channels to mic positions
best_perm = (0, 2, 3, 4, 6, 5, 1)

# Compute mic positions (starting at 270° = South, CCW)
mic_angles = [np.radians(270) + i * angle_step for i in range(n_mics)]
mic_positions = [(radius * np.cos(a), radius * np.sin(a)) for a in mic_angles]

# Re-map channels to physical mic positions using best permutation
channel_labels = [f"chan{ch}" for ch in best_perm]

# Plot
fig, ax = plt.subplots(figsize=(6, 6))
ax.set_aspect('equal')

# Circle
circle = plt.Circle(center, radius, color='cyan', alpha=0.3)
ax.add_artist(circle)

# Draw mics
for (x, y), label in zip(mic_positions, channel_labels):
    ax.plot(x, y, 'ko')
    ax.text(x, y + 0.005, label, ha='center', fontsize=10)

# Mark center
ax.plot(0, 0, 'r+', markersize=12)

# Cosmetic adjustments
plt.title("MiniDSP UMA-8 Mic Layout with Channel Mapping for P17")
plt.xlim(-radius - 0.02, radius + 0.02)
plt.ylim(-radius - 0.02, radius + 0.02)
plt.axis('off')
plt.grid(False)
plt.show()

In [None]:
'''from pyproj import Transformer

# Convert lat/lon of sensors and source to 2D Cartesian coordinates (UTM)
transformer = Transformer.from_crs("EPSG:4326", "EPSG:32617", always_xy=True)  # zone for Florida

# Compute sensor positions
center_xy = {k: transformer.transform(lon, lat) for k, (lat, lon) in latlon_coords.items()}
source_xy = transformer.transform(source_latlon[1], source_latlon[0])

# Now retry multilateration
sensor_positions = []
doa_vectors = []

for dev in aligned_doas:
    if dev in center_xy:
        sensor_positions.append(center_xy[dev])
        doa_vectors.append(doa_to_unit_vector(aligned_doas[dev]))

sensor_positions = np.array(sensor_positions)
doa_vectors = np.array(doa_vectors)
initial_guess = np.mean(sensor_positions, axis=0)

result = least_squares(doa_residuals, initial_guess, args=(sensor_positions, doa_vectors))
estimated_src = result.x

# Plotting
plt.figure(figsize=(8, 8))
plt.scatter(*source_xy, color='red', label="True Source", zorder=5)
plt.scatter(*estimated_src, color='purple', label="Estimated Source (LS)", zorder=5)

for dev, (x, y) in center_xy.items():
    plt.scatter(x, y, color='blue', zorder=5)
    plt.text(x + 1, y + 1, dev, fontsize=9)

    if dev in aligned_doas:
        angle_est = math.radians(aligned_doas[dev])
        dx, dy = 10, 10 * np.tan(angle_est)  # scaled direction vector (rough)
        plt.arrow(x, y, 10 * np.cos(angle_est), 10 * np.sin(angle_est),
                  head_width=4, head_length=4, fc='green', ec='green', zorder=4)

plt.title("DOA-Based Source Localization using Least Squares")
plt.xlabel("X (meters)")
plt.ylabel("Y (meters)")
plt.grid(True)
plt.axis('equal')
plt.legend()
plt.tight_layout()
plt.show()

# Final error
error_distance = np.linalg.norm(np.array(source_xy) - estimated_src)
error_distance'''


In [None]:
'''# Add Additive Gaussian White Noise (AGWN) to simulate noisy DOA estimates

# Noise level (standard deviation in degrees)
noise_std_deg = 0.1
np.random.seed(42)  # for reproducibility

# Add noise to each estimated DOA
noisy_aligned_doas = {k: v + np.random.normal(0, noise_std_deg) for k, v in aligned_doas.items()}

# Recompute with noisy DOAs
sensor_positions = []
doa_vectors = []

for dev in noisy_aligned_doas:
    if dev in center_xy:
        sensor_positions.append(center_xy[dev])
        doa_vectors.append(doa_to_unit_vector(noisy_aligned_doas[dev]))

sensor_positions = np.array(sensor_positions)
doa_vectors = np.array(doa_vectors)
initial_guess = np.mean(sensor_positions, axis=0)

result_noisy = least_squares(doa_residuals, initial_guess, args=(sensor_positions, doa_vectors))
estimated_src_noisy = result_noisy.x

# Plot noisy result
plt.figure(figsize=(8, 8))
plt.scatter(*source_xy, color='red', label="True Source", zorder=5)
plt.scatter(*estimated_src_noisy, color='purple', label="Estimated Source w/ Noise", zorder=5)

for dev, (x, y) in center_xy.items():
    plt.scatter(x, y, color='blue', zorder=5)
    plt.text(x + 1, y + 1, dev, fontsize=9)

    if dev in noisy_aligned_doas:
        angle_est = math.radians(noisy_aligned_doas[dev])
        dx, dy = 10 * np.cos(angle_est), 10 * np.sin(angle_est)
        plt.arrow(x, y, dx, dy, head_width=4, head_length=4, fc='green', ec='green', zorder=4)

plt.title("Noisy DOA-Based Source Localization (AGWN σ=0.01°)")
plt.xlabel("X (meters)")
plt.ylabel("Y (meters)")
plt.grid(True)
plt.axis('equal')
plt.legend()
plt.tight_layout()
plt.show()

# Compute noisy error
noisy_error_distance = np.linalg.norm(np.array(source_xy) - estimated_src_noisy)
noisy_error_distance'''


to check for multipath


In [None]:
import os
import matplotlib.pyplot as plt
import numpy as np
import soundfile as sf
from scipy.signal import spectrogram

# Constants
n_mics = 7
device_fs = 44100  # Sampling rate
uuid = "a283ea70-649d-4c18-bf8b-ba4e8507455a"

#spectrogram for each mic
plt.figure(figsize=(14, 12))
for i in range(n_mics):
    file = os.path.join(data_dir, f"{uuid}_chan{i}_v0.wav")
    signal, fs = sf.read(file)
    assert fs == device_fs, f"Sampling rate mismatch in {file}"

    f, t, Sxx = spectrogram(signal, fs=fs, nperseg=1024, noverlap=512)
    Sxx_dB = 10 * np.log10(Sxx + 1e-10)

    plt.subplot(n_mics, 1, i+1)
    plt.pcolormesh(t, f, Sxx_dB, shading='gouraud', cmap='inferno')
    plt.ylabel('Freq [Hz]')
    plt.title(f'Microphone {i} Spectrogram')
    plt.colorbar(label='dB')
    plt.ylim(0, 8000)  # Limit for visibility

plt.xlabel('Time [s]')
plt.tight_layout()
plt.show()


for more than 1 shots per event
