In [22]:
import os
import numpy as np
import ggwave
import soundfile as sf

class ToneLengthTestSuite:
    def __init__(self, 
                 phrase="Hello, I would like to place an order at the restaurant.",
                 out_dir="tone_length_test_wavs",
                 sample_rate=48000,
                 protocol_id=0):
        self.phrase = phrase
        self.out_dir = out_dir
        self.sample_rate = sample_rate
        self.protocol_id = protocol_id

        if not os.path.exists(out_dir):
            os.makedirs(out_dir, exist_ok=True)
    def save_sequence(self, tone_length_ms: float, extension='wav') -> str:
        """Save a single tone length sequence to a WAV file."""
        audio = self.generate_single_sequence(tone_length_ms)
        out_path = os.path.join(self.out_dir, f"tone_{int(tone_length_ms)}ms.{extension}")
        sf.write(out_path, audio, self.sample_rate, subtype='FLOAT')
        return out_path

    def generate_single_sequence(self, tone_length_ms: float) -> np.ndarray:
        """Generate a single waveform for one tone length."""
        instance = ggwave.init({
            "sampleRate": self.sample_rate,
            "sampleRateInp": self.sample_rate,
            "sampleRateOut": self.sample_rate,
            "samplesPerFrame": 512,
            "sampleFormatInp": 2,
            "sampleFormatOut": 2,
            "protocolId": 0,
            "payloadLength": len(self.phrase),
            "txDuration": max(0.5, len(self.phrase) * tone_length_ms / 1000.0),
            "txSymbolLength": float(tone_length_ms),
            "symbolDuration": float(tone_length_ms),
            "operatingMode": 2,
            # 🛠️ Add these fields explicitly even if you don't use them
            "soundMarkerThreshold": 0.1,
            "soundMarkerDuration": 0.1,
            "soundMarkerFrequency": 440,
            "soundMarkerAmplitude": 0.5,
            "soundMarkerPhase": 0,
            "soundMarkerSampleRate": self.sample_rate,
            "soundMarkerSampleFormat": 2,
            "soundMarkerSampleSize": 4,
            "soundMarkerSampleCount": 1,
            "soundMarkerSampleOffset": 0,
        })
        try:
            waveform = ggwave.encode(self.phrase, instance)
        finally:
            ggwave.free(instance)
        return np.frombuffer(waveform, dtype=np.float32)


    def generate_combined_sequence(self, tone_lengths_ms: list, silence_duration_sec: float = 0.5) -> np.ndarray:
        """Generate a single audio array that sweeps through all tone lengths with silences in between."""
        combined_audio = []
        silence = np.zeros(int(self.sample_rate * silence_duration_sec), dtype=np.float32)

        for tl in tone_lengths_ms:
            print(f"Generating tone for {tl:.3f} ms")
            tone = self.generate_single_sequence(tl)
            combined_audio.append(tone)
            combined_audio.append(silence)

        return np.concatenate(combined_audio)

    def save_combined_sequence(self, tone_lengths_ms: list, filename: str = "combined_sweep.wav"):
        """Save the combined tone sweep to a single WAV file."""
        audio = self.generate_combined_sequence(tone_lengths_ms)
        out_path = os.path.join(self.out_dir, filename)
        sf.write(out_path, audio, self.sample_rate, subtype='FLOAT')
        print(f"Saved combined sweep to {out_path}")
        return out_path

# ---- Example Usage ----

# Logarithmic spaced tone lengths between 0.05ms and 50ms
tone_lengths_ms = np.geomspace(0.05, 50.0, num=50)  # 50 steps

suite = ToneLengthTestSuite()
combined_wav_path = suite.save_combined_sequence(tone_lengths_ms, filename="full_sweep.wav")


Generating tone for 0.050 ms
Generating tone for 0.058 ms
Generating tone for 0.066 ms
Generating tone for 0.076 ms
Generating tone for 0.088 ms
Generating tone for 0.101 ms
Generating tone for 0.116 ms
Generating tone for 0.134 ms
Generating tone for 0.154 ms
Generating tone for 0.178 ms
Generating tone for 0.205 ms
Generating tone for 0.236 ms
Generating tone for 0.271 ms
Generating tone for 0.313 ms
Generating tone for 0.360 ms
Generating tone for 0.414 ms
Generating tone for 0.477 ms
Generating tone for 0.549 ms
Generating tone for 0.632 ms
Generating tone for 0.728 ms
Generating tone for 0.838 ms
Generating tone for 0.965 ms
Generating tone for 1.111 ms
Generating tone for 1.280 ms
Generating tone for 1.474 ms
Generating tone for 1.697 ms
Generating tone for 1.953 ms
Generating tone for 2.249 ms
Generating tone for 2.590 ms
Generating tone for 2.982 ms
Generating tone for 3.433 ms
Generating tone for 3.953 ms
Generating tone for 4.551 ms
Generating tone for 5.241 ms
Generating ton

In [25]:
import os
import time
import ggwave
import numpy as np
import matplotlib.pyplot as plt
import pyaudio
import soundfile as sf
from scipy.interpolate import UnivariateSpline

def compute_cer(ref, hyp):
    """
    Computes Character Error Rate (CER) between reference and hypothesis.
    """
    m, n = len(ref), len(hyp)
    dp = np.zeros((m+1, n+1), dtype=int)
    for i in range(m+1):
        dp[i,0] = i
    for j in range(n+1):
        dp[0,j] = j
    for i in range(1, m+1):
        for j in range(1, n+1):
            dp[i,j] = min(
                dp[i-1,j] + 1,
                dp[i,j-1] + 1,
                dp[i-1,j-1] + (ref[i-1] != hyp[j-1])
            )
    return dp[m,n] / max(1, m)

def run_single_test(wav_path: str,
                    reference_string: str,
                    timeout: float = 10.0,
                    sample_rate: int = 48000,
                    buffer_size: int = 4096,
                    min_silence_sec: float = 0.5,
                    verbose: bool = False):
    """
    Plays the provided wav file, records from microphone, decodes using GGWave,
    and evaluates CER. Returns (CER, decoded_string, success:bool).
    """
    try:
        audio, sr = sf.read(wav_path, dtype='float32')
        if sr != sample_rate:
            raise ValueError("Sample rate mismatch.")
    except Exception as e:
        print(f"Failed to load {wav_path}: {e}")
        return 1.0, '', False

    p = pyaudio.PyAudio()
    output_stream = p.open(format=pyaudio.paFloat32,
                           channels=1,
                           rate=sample_rate,
                           output=True)
    input_stream = p.open(format=pyaudio.paFloat32,
                          channels=1,
                          rate=sample_rate,
                          input=True,
                          frames_per_buffer=buffer_size)

    instance = ggwave.init()  # <-- Correct: no custom parameters here for decoding!

    try:
        input_frames = []
        played = False
        CHUNK = buffer_size
        t_start = time.time()

        time.sleep(min_silence_sec)

        while True:
            rec_data = input_stream.read(CHUNK, exception_on_overflow=False)
            input_frames.append(rec_data)
            if not played:
                output_stream.write(audio.tobytes())
                played = True
            if played and (time.time() - t_start > timeout):
                break

        time.sleep(min_silence_sec)

    finally:
        output_stream.stop_stream()
        output_stream.close()
        input_stream.stop_stream()
        input_stream.close()
        p.terminate()

    recorded = np.frombuffer(b''.join(input_frames), dtype=np.float32)

    decoded_string = ""
    for i in range(0, len(recorded)-CHUNK, CHUNK//2):
        chunk = recorded[i:i+CHUNK]
        if len(chunk) < CHUNK:
            continue
        result = ggwave.decode(instance, chunk.tobytes())
        if result:
            try:
                msg = result.decode('utf-8')
                if msg == reference_string:
                    decoded_string = msg
                    break
                elif len(msg) > len(decoded_string):
                    decoded_string = msg
            except Exception:
                continue
    ggwave.free(instance)

    if not decoded_string:
        if verbose:
            print("[Decode Fail] No GGWave message detected")
        return 1.0, '', False

    cer = compute_cer(reference_string, decoded_string)
    if verbose:
        print(f"Decoded: {decoded_string}\nCER: {cer:.4f}")
    return cer, decoded_string, True

def run_experiment(
    suite,
    tone_lengths_ms: list,
    repetitions: int = 2,
    timeout_per_trial: float = 10.0
):
    """
    Systematically tests each tone length over multiple repetitions.
    Returns:
        result_dict = {
            'tone_length_ms': [...],
            'CER_mean': [...], 
            'CER_std':  [...], 
            'success_rate': [...],
            'decoded_strings': [ [runs]... ]
        }
    """
    result_dict = {
        'tone_length_ms': [],
        'CER_mean': [],
        'CER_std': [],
        'success_rate': [],
        'decoded_strings': []
    }
    for tl in tone_lengths_ms:
        wav_path = suite.save_sequence(tl)
        cers = []
        decodes = []
        success_sum = 0
        for rep in range(repetitions):
            cer, decoded, ok = run_single_test(wav_path, suite.phrase, timeout=timeout_per_trial)
            cers.append(cer)
            decodes.append(decoded)
            success_sum += int(ok)
            print(f"[{tl:.1f} ms] Trial {rep+1}/{repetitions}: CER={cer:.3f} {'(OK)' if ok else '[FAIL]'}")
        result_dict['tone_length_ms'].append(tl)
        result_dict['CER_mean'].append(np.mean(cers))
        result_dict['CER_std'].append(np.std(cers))
        result_dict['success_rate'].append(success_sum / repetitions)
        result_dict['decoded_strings'].append(decodes)
    return result_dict

def plot_results(
    result_dict,
    min_tone_ms: int = 2,
    max_tone_ms: int = 50,
    smooth: bool = True,
    baseline_cer = None
):
    """
    Plots CER vs. Tone Length with error bars, spline fit, and optimal marker.
    """
    x = np.array(result_dict['tone_length_ms'])
    y = np.array(result_dict['CER_mean'])
    yerr = np.array(result_dict['CER_std'])
    suc = np.array(result_dict['success_rate'])

    fig, ax = plt.subplots(figsize=(8,5))
    ax.errorbar(x, y, yerr=yerr, fmt='o', label="Measured CER", capsize=4)
    if smooth and len(x) > 3:
        spl = UnivariateSpline(x, y, k=3, s=0.001)
        tx = np.linspace(x.min(), x.max(), 512)
        ty = spl(tx)
        ax.plot(tx, ty, alpha=0.7, label="Spline fit")
        smooth_cer = ty
        best_idx = np.argmin(smooth_cer)
        best_ms = tx[best_idx]
        min_cer = smooth_cer[best_idx]
    else:
        best_idx = np.argmin(y)
        best_ms = x[best_idx]
        min_cer = y[best_idx]
    
    base_cer = baseline_cer if baseline_cer is not None else (y[0] if len(y) > 0 else 1.0)
    target_cer = min_cer + 0.1 * (base_cer - min_cer)

    ax.axvline(best_ms, linestyle="--", color="seagreen", alpha=0.5, label=f"Best Tone: {best_ms:.1f} ms")
    ax.set_title("Character Error Rate (CER) vs Tone Length (ms)")
    ax.set_xlabel("Tone/Symbol Length (ms)")
    ax.set_ylabel("Character Error Rate (CER)")
    ax.set_ylim(-0.05, 1.05)
    ax.set_xlim(min_tone_ms - 2, max_tone_ms + 2)
    ax.grid(True, which='major', linestyle='-', alpha=0.3)
    ax.grid(True, which='minor', linestyle=':', alpha=0.1)
    ax.legend()
    plt.tight_layout()
    plt.show()

    print(f"Best Tone Length: {best_ms:.1f} ms (CER={min_cer:.3f})")
    print(f"90% reduction from baseline ({base_cer:.2f} → {target_cer:.2f}) at or before {best_ms:.1f} ms")

# Example usage
suite = ToneLengthTestSuite()
test_lengths = list(range(4, 51, 4))  # 4,8,12,...,48 ms
results = run_experiment(suite, test_lengths, repetitions=3, timeout_per_trial=8)
plot_results(results, min_tone_ms=4, max_tone_ms=50, smooth=True)


[4.0 ms] Trial 1/3: CER=1.000 [FAIL]
[4.0 ms] Trial 2/3: CER=1.000 [FAIL]
[4.0 ms] Trial 3/3: CER=1.000 [FAIL]
[8.0 ms] Trial 1/3: CER=1.000 [FAIL]


KeyboardInterrupt: 