In [None]:
import sys
import os
sys.path.insert(0, os.path.abspath("../cli"))
from simulation.sonar import Sonar
from simulation.utils import EllipsisBottom, FlatBottom, positions_line
from simulation.sources import GaborSource
from simulation.plotting import plot_velocity, plot_snapshot_and_signal
import simulation.utils as utils
import numpy as np
import numpy.typing as npt
import matplotlib.pyplot as plt
import seaborn as sns
import math


def get_coord_line(num_points:int, point_distance: float,  alpha:float):
    """
    Get the coordinates on a line for a given angle
    """
    length = (num_points-1) * point_distance
    dist = np.linspace(-length / 2.0, length / 2.0, num_points)
    return np.array([dist * np.sin(np.deg2rad(alpha)), dist * np.cos(np.deg2rad(alpha))]).transpose()

Define the domain parameters

In [None]:
domain_size = (3, 6) # (m, m)
v_env = 1.5 # km/s

ns = 128 # number of sources
f0 = 50 # kHz
source_cy = .5 # center depth of sources (m)
rec_cy = 2 # center depth of receivers (m)

spatial_dist = 0.002 # m
source_distance = 0.002 # m

angles = [45, 60, 75, 90, 105, 120, 135] 

Create sonars for each angle

In [None]:
sonars = {
    a: Sonar(
        domain_size,
        f0,
        v_env,
        FlatBottom(),
        dt=None,
        spatial_dist=spatial_dist,
        tn=4,
        nbl=100,
    )
    for a in angles
}

src_coord = np.array(
    [(domain_size[0] - source_distance * ns) / 2, source_cy]
) + positions_line(stop_x=ns * source_distance, posy=0, n=ns)

src_args = {
    a: {
    "name": "src",
    "grid": sonars[a].model.grid,
    "npoint": ns,
    "f0": f0,
    "time_range": sonars[a].time_range,
    "coordinates": src_coord,
    }  
    for a in angles 
}

rec_args = {
    a: {
    "name": "rec",
    "grid": sonars[a].model.grid,
    "time_range": sonars[a].time_range,
    "npoint": ns,
    "coordinates": get_coord_line(ns, source_distance, a) + np.array([domain_size[0] / 2, 4]),
    }
    for a in angles
}

for a, s in sonars.items():
    s.set_source("GaborSource", src_args[a])
    s.set_receiver("Receiver", rec_args[a])
    s.finalize()

Run simulation for each angle

In [None]:
recordings = {}

for a, s in sonars.items():
    print(f"Running simulation at {a} deg")
    s.run_beam(alpha = 90)
    recordings[a] = s.rec.data

In [None]:
def correlate(recording:npt.NDArray, ideal_signal:npt.NDArray, start_iter: int = 5000) -> int:
    """
    Correlate the recording with the ideal signal and return the index of the peak.

    Args:
        recording: The recording of a single receiver to correlate with the ideal signal.
        ideal_signal: The ideal signal.
        start_iter: The index to start the search from.

    Returns:
        The index of the peak.
    """
    assert recording.shape[0] > 5000
    correlate = np.correlate(recording[start_iter:], ideal_signal, mode="same")
    peak = start_iter + correlate.argmax()
    return peak

def get_peaks(recording: npt.NDArray, ideal_signal: npt.NDArray) -> npt.NDArray[np.int_]:
    peaks = np.zeros(recording.shape[1], dtype=int)
    for i in range(recording.shape[1]):
        peaks[i] = int(correlate(recording[:, i], ideal_signal))
    return peaks

peaks ={}
for a, r in recordings.items():
    peaks[a] = get_peaks(recordings[a], sonars[a].src.signal_packet)

In [None]:
def iters2dist(stop_iter: int, dt: float, v_env: float = 1500, start_iter: int = 0) -> float:
    """
    Convert the index to distance.

    Args:
        end_iter: The index of a time iteration.
        dt: The time step (s).
        v_env: The velocity of the environment (m/s).
        start_iter: The index when the signal is sent.

    Returns:
        The distance (m).
    """
    return (stop_iter - start_iter) * dt * v_env

def peaks2alpha(peaks: npt.NDArray[np.int_], dt: float, ds: float) -> float:
    """
    Compute approximate direction of the incoming signal from the index of the timesteps peaks are detected by the recievers.
    
    Args:
        peaks: The index of the timesteps peaks are detected by the recievers.
        dt: The time step (s).
        ds: The distance between the receivers (m).

    Returns:
        The approximate direction of the incoming signal (deg).
    """
    alphas = np.zeros((peaks.shape[0] - 10))
    for i in range(alphas.shape[0]):
        end_rec = min(i + 10, peaks.shape[0] - 1)
        alphas[i] = np.rad2deg(
            np.arccos(
                iters2dist(peaks[end_rec], dt=dt * 1e-3, start_iter=peaks[i])
                / ((end_rec - i) * ds)
            )
        )
    return np.nanmean(alphas)

def plot_peaks_alpha(allpeaks, dt, ds) -> None:
    alphas = {}
    for alpha, peaks in allpeaks.items():
        alphas[alpha] = {
            "neighbors": np.zeros((peaks.shape[0] - 1)),
            "window10": np.zeros((peaks.shape[0] - 1)),
        }
        for i in range(peaks.shape[0] - 1):
            alphas[alpha]["neighbors"][i] = np.rad2deg(
                np.arccos(
                    iters2dist(peaks[i + 1], dt=dt * 1e-3, start_iter=peaks[i]) / ds
                )
            )
            start_rec = max(0, i - 5)
            end_rec = min(i + 5, peaks.shape[0] - 1)
            alphas[alpha]["window10"][i] = np.rad2deg(
                np.arccos(
                    iters2dist(
                        peaks[end_rec], dt=dt * 1e-3, start_iter=peaks[start_rec]
                    )
                    / ((end_rec - start_rec) * ds)
                )
            )

    _, axes = plt.subplots(int(len(alphas.keys()) / 2 + 1), 2, figsize=(12, 12))
    sns.lineplot(allpeaks, dashes=False, ax=axes[0, 0])
    for i, alpha in enumerate(alphas.keys()):
        sns.lineplot(alphas[alpha], ax=axes[math.floor((1 + i) / 2), (1 + i) % 2])
        axes[math.floor((1 + i) / 2), (1 + i) % 2].set_title(
            f"alpha = {alpha} deg {alphas[alpha]['neighbors'].mean():.2f} deg {alphas[alpha]['window10'].mean():.2f} deg"
        )
    plt.tight_layout()
    plt.savefig("alpha.png")
    plt.show()

plot_peaks_alpha(peaks, sonars[angles[0]].model.critical_dt, source_distance)

for alpha, p in peaks.items():
    print(
        f"alpha = {alpha} deg, mean = {peaks2alpha(p, sonars[angles[0]].model.critical_dt, source_distance):.2f} deg"
    )

Measure with actual echoes

In [None]:
domain_size = (6, 3) # (m, m)
angles = [45, 60, 90, 105, 135] 

sonars = {
    a: Sonar(
        domain_size,
        f0,
        v_env,
        EllipsisBottom(True),
        dt=None,
        spatial_dist=spatial_dist,
        nbl=100,
    )
    for a in angles
}

src_coord = np.array(
    [(domain_size[0] - source_distance * ns) / 2, source_cy]
) + positions_line(stop_x=ns * source_distance, posy=0, n=ns)

src_args = {
    a: {
    "name": "src",
    "grid": sonars[a].model.grid,
    "npoint": ns,
    "f0": f0,
    "time_range": sonars[a].time_range,
    "coordinates": src_coord,
    }  
    for a in angles 
}

rec_args = {
    a: {
    "name": "rec",
    "grid": sonars[a].model.grid,
    "time_range": sonars[a].time_range,
    "npoint": ns,
    "coordinates": src_coord,
    }
    for a in angles
}

for a, s in sonars.items():
    s.set_source("GaborSource", src_args[a])
    s.set_receiver("Receiver", rec_args[a])
    s.finalize()

print(f"Sonar domain: {domain_size}, spatial discretization: {spatial_dist} {sonars[angles[0]].model.vp.shape}, time discretization: {sonars[angles[0]].dt}, time: {sonars[angles[0]].time_range}")

In [None]:
recordings = {}

for a, s in sonars.items():
    print(f"Running simulation at {a} deg")
    s.run_beam(alpha = a)
    recordings[a] = s.rec.data


peaks ={}
for a, r in recordings.items():
    peaks[a] = get_peaks(recordings[a], sonars[a].src.signal_packet)


plot_peaks_alpha(peaks, sonars[angles[0]].model.critical_dt, source_distance)

for alpha, p in peaks.items():
    print(
        f"alpha = {alpha} deg, mean = {peaks2alpha(p, sonars[angles[0]].model.critical_dt, source_distance):.2f} deg"
    )