# 

# Environment Setup

In [None]:
#!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

In [2]:
from typing import List, Union, Tuple, Optional
import subprocess
from pathlib import Path
import matplotlib.pyplot as plt
import librosa
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import math
from abc import abstractmethod

from boomspeaver.tools.data import get_repo_dir, load_numpy_file, save_numpy_file, pad_vector
from boomspeaver.tools.sampler import create_random_vector, shift_vector, mirror_vector
from boomspeaver.tools.dsp.dsp import fft_analysis, detect_peaks, get_resolution, detect_signal_bounds,calculate_spectrogram
from boomspeaver.tools.plot.configs import Axis, Line, Plotter, Points, Subplot
from boomspeaver.tools.plot.multi_plotter import MultiPlotter
from boomspeaver.tools.multiprocessing_runner import MultiprocessingRunner

In [3]:
print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("Current CUDA device:", torch.cuda.current_device())
print("Device name:", torch.cuda.get_device_name(torch.cuda.current_device()))

PyTorch version: 2.5.1+cu121
CUDA available: True
Current CUDA device: 0
Device name: NVIDIA GeForce RTX 4060 Laptop GPU


# Generate dataset

In [None]:
repo_dir=get_repo_dir(run_type="python")
input_time_path = repo_dir / "examples/time_vector_10ms_48kHz.npy"
loudspeaker_param_path = repo_dir / "examples/prv_audio_6MB400_8ohm.json"

In [15]:
!cd "$repo_dir"
!pwd

/home/freetzz/repo/ls_wip/loudspeaker-solver


## Check fenics

In [16]:
!docker compose up -d

[1A[1B[0G[?25l[+] Running 0/1
 [33m⠋[0m Container fenics  Starting                                              [34m0.1s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠙[0m Container fenics  Starting                                              [34m0.2s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠹[0m Container fenics  Starting                                              [34m0.3s [0m
[?25h[1A[1A[0G[?25l[+] Running 0/1
 [33m⠸[0m Container fenics  Starting                                              [34m0.4s [0m
[?25h[1A[1A[0G[?25l[34m[+] Running 1/1[0m
 [32m✔[0m Container fenics  [32mStarted[0m                                               [34m0.4s [0m
[?25h

In [17]:
!./run_fenics.sh -c boomspeaver/acoustic/the_membrane.py --initial_condition_path examples/cosine_wave.npy

Real mode
Sensitivity: 93.1 dB
Resonance Frequency (fS): 117 Hz
QTS: 0.6
Xmax: 4 mm
Bl: 8.26 N/A
Moving Mass (MMS): 0.0098 kg
Info    : Meshing 1D...                                                                                                                        
Info    : [  0%] Meshing curve 2 (Ellipse)
Info    : [ 60%] Meshing curve 3 (Ellipse)
Info    : Done meshing 1D (Wall 0.00160696s, CPU 0.001407s)
Info    : Meshing 2D...
Info    : Meshing surface 1 (Plane, Frontal-Delaunay)
Info    : Done meshing 2D (Wall 14.8684s, CPU 14.665s)
Info    : 221179 nodes 442360 elements
Info    : Writing 'output/annulus.msh'...
Info    : Done writing 'output/annulus.msh'
Info    : Reading 'output/annulus.msh'...
Info    : 5 entities
Info    : 221179 nodes
Info    : 442358 elements                                                                                     
Info    : Done reading 'output/annulus.msh'                                                                      
2.0833333333333

To make sure that we won't miss any frequency, we should provide domain resolution: "At least 10–20 nodes per wavelength of the smallest (highest frequency) wave you want to resolve." Formula to calculate wavelength: `λₘᵢₙ = v / fₘₐₓ`.

### Initial membrane condition
Let's generate some random initial membrane conditions. Membrane is represented by 128 points. Scaled to 0 displacement on edges.

In [18]:
# n_points=[128, 64, 32, 16]
n_points=[128]
value_range=(-1.0, 1.0)
n_samples=20000
output_dir_name="output_surrogate"

In [None]:
def random_diaphragm_sample(output_dir: Path, n_samples: int, n_points: int, value_range: tuple[float, float], seed: int = 42) -> None:
    assert isinstance(output_dir, Path) and output_dir.exists()
    assert isinstance(n_points, int)
    assert isinstance(n_samples, int)
    assert isinstance(value_range, tuple)

    rng = np.random.default_rng(seed=seed)
    output_paths=[]
    for i in range(n_samples):
        output_path=output_dir / f"membrane_{i}.npy"
        output_paths.append(output_path)
        v = create_random_vector(n_points, rng=rng, value_range=value_range)
        v_shift = shift_vector(v, to_idx=-1, to_val=0.0)
        # v_mirror = mirror_vector(v_shift)
        save_numpy_file(output_path=output_path, data=v_shift)
    return output_paths

In [None]:
membrane_paths=[]
for i in n_points:
    output_dir = repo_dir / output_dir_name / f"dataset_first_{i}"
    output_dir.mkdir(parents=True, exist_ok=False)
    output_dir.mkdir(parents=True, exist_ok=True)
    output_dir = output_dir.relative_to(repo_dir)

    membrane_paths_ = random_diaphragm_sample(
        output_dir=output_dir,
        n_samples=n_samples,
        n_points=i,
        value_range=value_range,
        )
    membrane_paths+=membrane_paths_

In [21]:
print(len(membrane_paths))

20000


### Simulate

In [22]:
def run_simulation(membrane_path):
    output_path = membrane_path.with_suffix("") / f"{membrane_path.stem}_simulated.xdmf"
    if not output_path.with_suffix(".npy").exists():
        cmd = [
            "./run_fenics.sh",
            "-c", "boomspeaver/acoustic/the_membrane.py",
            "--initial_condition_path", str(membrane_path),
            "--output_path", str(output_path),
            "--listen"
        ]
        subprocess.run(cmd, check=True)
        # TODO only temporary for saving only numpy arrays
        # Safe cleanup with checks
        for path in [
            output_path,
            output_path.with_suffix(".h5"),
            output_path.parent / "annulus.msh",
        ]:
            try:
                if path.exists():
                    subprocess.run(["docker", "exec", "fenics", "rm", "-rf", str(path)], check=False)
            except Exception as e:
                print(f"[WARN] Could not remove {path}: {e}")

In [None]:
runner = MultiprocessingRunner(run_simulation, membrane_paths, num_workers=6)
runner.run()