In [None]:

# !pip install numpy scipy matplotlib soundfile tqdm pyfar aic-sdk ipywidgets
import os
import numpy as np
import pyfar as pf
import matplotlib.pyplot as plt
from IPython.display import Audio, display
import aic_sdk as aic
import scipy

MODELS_PATH = "aic_models"
os.makedirs(MODELS_PATH, exist_ok=True)

ai_coustics_models = {
    # L/16kHz models:
    "sparrow-l-16khz": 16000,
    "quail-vf-l-16khz": 16000,
    "quail-l-16khz": 16000,

    # "sparrow-s-16khz": 16000,
    # "quail-s-16khz": 16000,

    # "quail-l-8khz": 8000,
    # "quail-s-8khz": 8000,
    # "sparrow-s-8khz": 8000,
    # "sparrow-l-8khz": 8000,

    # "sparrow-l-48khz": 48000,
    # "sparrow-s-48khz": 48000,
    # "sparrow-xs-48khz": 48000,
    # "sparrow-xxs-48khz": 48000,
}

In [None]:
import os
import ipywidgets as widgets
from IPython.display import display, HTML

display(HTML('ðŸ‘‰ <a href="https://developers.ai-coustics.io/dashboard/sdk/keys" target="_blank">Generate your AIC_SDK_LICENSE here</a>'))

# Use Textarea instead of Text
license_widget = widgets.Textarea(
    value='',
    placeholder='Paste your AIC_SDK_LICENSE here',
    description='License:',
    disabled=False,
    layout=widgets.Layout(width='50%', height='40px') # Adjusted height
)

submit_button = widgets.Button(
    description="Set License",
    button_style='primary'
)

output_area = widgets.Output()

def on_submit(b):
    with output_area:
        output_area.clear_output()
        if license_widget.value:
            os.environ["AIC_SDK_LICENSE"] = license_widget.value.strip()
            license_widget.value = "" # Clear for security
            license_widget.placeholder = "License key saved in memory"
            print("âœ… AIC_SDK_LICENSE has been set successfully.")

submit_button.on_click(on_submit)

display(widgets.HBox([license_widget, submit_button]), output_area)

In [None]:
async def process_chunk(
    processor: aic.ProcessorAsync,
    vad_ctx: aic.VadContext,
    chunk: np.ndarray,
    buffer_size: int,
    num_channels: int,
) -> tuple[np.ndarray, bool] :
    """Process a single audio chunk with the given processor."""
    valid_samples = chunk.shape[1]

    # Create and zero-initialize process buffer
    process_buffer = np.zeros((num_channels, buffer_size), dtype=np.float32)

    # Copy input data into the buffer
    process_buffer[:, :valid_samples] = chunk

    # Process the chunk
    processed_chunk = await processor.process_async(process_buffer)

    # Return only the valid part
    return processed_chunk[:, :valid_samples], vad_ctx.is_speech_detected()

async def process_single(
    audio_input: np.ndarray,
    sample_rate: int,
    num_channels: int,
    enhancement_level: float | None,
    model_id: str,
) -> tuple[np.ndarray, np.ndarray]:

    # check if model is in folder
    available_models = os.listdir(MODELS_PATH)
    for model in available_models:
        if model_id.replace("-", "_") in model:
            model_path = os.path.join(MODELS_PATH, model)
            model = aic.Model.from_file(model_path)
            break
    else:
        print(f"Downloading model {model_id}...")
        model_path = aic.Model.download(model_id, "models")
        model = aic.Model.from_file(model_path)

    # Configure generic processor (handles buffering internally if needed)
    config = aic.ProcessorConfig.optimal(
        model, sample_rate=sample_rate, num_channels=num_channels
    )
    
    # Create the processor
    processor = aic.ProcessorAsync(
        model, 
        license_key=os.getenv("AIC_SDK_LICENSE"), 
        config=config
    )
    
    # Get Contexts
    proc_ctx = processor.get_processor_context()
    vad_ctx = processor.get_vad_context()

    # Set parameters
    if enhancement_level is not None:
        try:
            proc_ctx.set_parameter(aic.ProcessorParameter.EnhancementLevel, enhancement_level)
        except:
            pass

    # --- LATENCY COMPENSATION LOGIC ---
    
    # 2. Get the latency (in samples)
    latency_samples = proc_ctx.get_output_delay()

    # 3. Pad the INPUT at the END
    # We add zeros to the end to flush the model's internal buffer
    padding = np.zeros((num_channels, latency_samples), dtype=np.float32)
    padded_input = np.concatenate([audio_input, padding], axis=1)

    # Prepare outputs (same size as padded input)
    output_buffer = np.zeros_like(padded_input)
    vad_buffer = np.zeros_like(padded_input)

    # 4. Process Loop
    num_frames = config.num_frames
    total_len = padded_input.shape[1]

    for start in range(0, total_len, num_frames):
        end = min(start + num_frames, total_len)
        chunk_len = end - start
        
        # Extract chunk
        chunk = padded_input[:, start:end]

        # Handle last chunk padding (if smaller than model frame size)
        if chunk_len < num_frames:
            chunk = np.pad(chunk, ((0,0), (0, num_frames - chunk_len)))

        # Process
        processed_chunk, is_speech = await process_chunk(
            processor, vad_ctx, chunk, num_frames, num_channels
        )

        # Store result (cropping back if we padded the chunk itself)
        output_buffer[:, start:end] = processed_chunk[:, :chunk_len]
        vad_buffer[:, start:end] = 1.0 if is_speech else 0.0

    # 5. Crop the OUTPUT at the START
    # The first 'latency_samples' are garbage/warm-up. 
    # The actual aligned audio follows immediately after.
    final_audio = output_buffer[:, latency_samples:]
    final_vad = vad_buffer[:, latency_samples:]

    return final_audio, final_vad

def generate_test_signals(target_fs):
    f_arr = pf.signals.files.speech(voice="female", sampling_rate=target_fs).time
    f_arr = np.pad(f_arr, ((0, 0), (0, target_fs * 2)))
    m_arr = pf.signals.files.speech(voice="male", sampling_rate=target_fs).time
    m_arr = np.pad(m_arr, ((0, 0), (target_fs * 2, 0)))
    d_arr = pf.signals.files.drums(sampling_rate=target_fs).time
    r_arr = pf.signals.files.room_impulse_response(sampling_rate=target_fs).time

    target_len = f_arr.shape[1]

    def force_len(sig, length):
        """Truncates or zero-pads signal to match specific length."""
        current = sig.shape[1]
        if current > length:
            return sig[:, :length]
        elif current < length:
            return np.pad(sig, ((0, 0), (0, length - current)))
        return sig

    d_resized = force_len(d_arr, target_len)
    female_drums = f_arr + d_resized
    m_conv = scipy.signal.fftconvolve(m_arr, r_arr, mode="full")
    m_conv *= 0.1
    m_conv_resized = force_len(m_conv, target_len)
    female_male = f_arr + m_conv_resized
    return female_drums, female_male


In [None]:
from IPython.display import HTML

async def run_demo():
    print(f"Starting processing for {len(ai_coustics_models)} models...\n")

    for i, (model_id, fs) in enumerate(ai_coustics_models.items()):

        # Section Header
        display(HTML(f"<h2>Model: {model_id} ({fs} Hz)</h2>"))

        # 1. Generate Signals
        female_drums, female_male = generate_test_signals(fs)

        # 2. Display Input Audio (Before Processing)
        print("Input Signal (Noisy):")
        display(Audio(female_drums[0], rate=fs))
        display(Audio(female_male[0], rate=fs))

        # 3. Process
        output_sig_drums, vad_vector_drums = await process_single(
            audio_input=female_drums.astype(np.float32),
            sample_rate=fs,
            num_channels=1,
            enhancement_level=1.0,
            model_id=model_id,
        )

        output_sig_female_male, vad_vector_fm = await process_single(
            audio_input=female_male.astype(np.float32),
            sample_rate=fs,
            num_channels=1,
            enhancement_level=1.0,
            model_id=model_id,
        )

        # 4. Plotting (One plot per model)
        _, ax = plt.subplots(figsize=(14, 4))
        duration = female_drums.shape[1] / fs
        time_axis = np.linspace(0, duration, female_drums.shape[1])
        ax.plot(time_axis, female_drums[0], label="Original (Noisy)", alpha=0.6, color="silver") # Corrected plotting dimension
        ax.plot(time_axis, output_sig_drums[0], label="Enhanced (Processed)", alpha=0.9, color="#007acc", linewidth=1) # Corrected plotting dimension
        ax.plot(time_axis, vad_vector_drums[0] * np.max(output_sig_drums[0]), label="VAD Output", linestyle="--", color="red")
        ax.set_xlabel("Time (s)")
        ax.set_ylabel("Amplitude")
        ax.set_xlim(0, duration)
        ax.grid(True, linestyle="--", alpha=0.5)
        ax.legend(loc="upper right")
        plt.tight_layout()
        plt.show()

        _, ax = plt.subplots(figsize=(14, 4))
        duration = female_male.shape[1] / fs
        time_axis = np.linspace(0, duration, female_male.shape[1])
        ax.plot(time_axis, female_male[0], label="Original (Noisy)", alpha=0.6, color="silver") # Corrected plotting dimension
        ax.plot(time_axis, output_sig_female_male[0], label="Enhanced (Processed)", alpha=0.9, color="#007acc", linewidth=1) # Corrected plotting dimension
        ax.plot(time_axis, vad_vector_fm[0] * np.max(output_sig_female_male[0]), label="VAD Output", linestyle="--", color="red")
        ax.set_xlabel("Time (s)")
        ax.set_ylabel("Amplitude")
        ax.set_xlim(0, duration)
        ax.grid(True, linestyle="--", alpha=0.5)
        ax.legend(loc="upper right")
        plt.tight_layout()
        plt.show()

        # 5. Display Output Audio (After Processing)
        print("Output Signal (Enhanced):")
        display(Audio(output_sig_drums[0], rate=fs))
        display(Audio(output_sig_female_male[0], rate=fs))


        # Separator
        print("-" * 80)

await run_demo()