In [None]:
import sys
from collections import deque
from pathlib import Path
from typing import Deque, Optional

import IPython.display as ipd
import ipywidgets as widgets
import librosa
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal
from ipywidgets import Layout, interact

IN_COLAB = "google.colab" in sys.modules
if not IN_COLAB:
    import pyaudio
SR = 16000
plt.style.use("dark_background")

In [None]:
%%html
<style>
/*overwrite hard coded write background by vscode for ipywidges */
.cell-output-ipywidget-background {
   background-color: transparent !important;
}

/*set widget foreground text and color of interactive widget to vs dark theme color */
:root {
    --jp-widgets-color: var(--vscode-editor-foreground);
    --jp-widgets-font-size: var(--vscode-editor-font-size);
}
</style>

In [None]:
%%bash -s "$IN_COLAB" 
if [ $1 = True ]; then
    mkdir -p ./data/{aecs,rirs}
    repo_prefix=https://raw.githubusercontent.com/georgygospodinov/speech_course/main/week02/data
    wget -q $repo_prefix/salut_time_query.wav -P data/
    wget -q $repo_prefix/aecs/{doubletalk,salut}_{mic,spk}.wav -P data/aecs
    wget -q $repo_prefix/rirs/{classroom,greathall,octagon}.wav -P data/rirs
fi

# Signals

In [None]:
def create_stream(
    pyaudio_manager,
    sample_rate: int = 16_000,
    frames_per_buffer: int = 160,
):
    return pyaudio_manager.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=sample_rate,
        input=True,
        frames_per_buffer=frames_per_buffer,
    )


def record_audio(stream, duration_seconds: float = 0.1) -> np.ndarray:
    frames_to_read = int(stream._rate * duration_seconds)
    byte_frames: bytes = stream.read(frames_to_read, exception_on_overflow=False)
    audio_frame = np.frombuffer(byte_frames, dtype=np.int16)
    float_audio_frame = audio_frame / (1 << 15)
    return float_audio_frame


def spectral_power(signal: np.ndarray) -> np.ndarray:
    spectral_power = np.abs(np.fft.rfft(signal))
    return spectral_power


def start_recording(chunks: int, sr: int = 16_000, fourier: bool = False) -> deque:
    frame_history: Deque = deque(maxlen=chunks)

    pa_manager = pyaudio.PyAudio()
    stream = create_stream(pa_manager, sample_rate=sr)

    while True:
        try:
            audio_frames = record_audio(stream)
            frame_history.append(audio_frames)
            data = np.concatenate(list(frame_history))
            last_data = np.concatenate(list(frame_history)[-2:])

            plt.figure(figsize=(12, 2))
            plt.subplot(121)
            plt.axis("off")
            plt.grid(False)
            plt.ylim(-0.05, 0.05)
            plt.plot(data)

            if fourier:
                plt.subplot(122)
                spectrum = spectral_power(last_data)
                hz = np.arange(spectrum.size) * (sr / last_data.size)
                plt.xlabel("Hz")
                plt.xlim(0, 8000)
                plt.ylim(0, 4)
                plt.plot(hz, spectrum)

            plt.show()
            ipd.clear_output(wait=True)
        except (OSError, KeyboardInterrupt):
            pa_manager.terminate()
            stream.stop_stream()
            stream.close()
            break

    return frame_history


if not IN_COLAB:
    last_audio_chunks = start_recording(chunks=10, sr=SR, fourier=True)
    res = np.concatenate(list(last_audio_chunks))

    plt.figure(figsize=(12, 2))
    plt.plot(res)
    plt.show()
    ipd.display(ipd.Audio(res, rate=SR))

In [None]:
plt.figure(figsize=(12, 1))
plt.plot(res)
plt.axis("off")
plt.show()
ipd.display(ipd.Audio(res, rate=SR))

In [None]:
wav_signal, sr = librosa.load("./data/salut_time_query.wav", sr=SR)

time_interval = 0.1

n_frames = int(sr * time_interval)
target_sr = 1_000
scale_value = 1 << 12

signal_frame = wav_signal[:n_frames]

plt.figure(figsize=(20, 12))

plt.subplot(221)
plt.title("Analog Signal")
plt.plot(np.arange(n_frames) * time_interval / sr, signal_frame)

resampled_signal = librosa.resample(signal_frame, orig_sr=sr, target_sr=target_sr)

plt.subplot(222)
plt.title("Discrete Signal")
plt.stem(np.arange(resampled_signal.size) * time_interval / target_sr, resampled_signal, basefmt=" ")

plt.subplot(223)
plt.title("Quantized Signal")
plt.plot(
    np.arange(n_frames) * time_interval / sr,
    (signal_frame * scale_value).astype("int") / scale_value,
)
plt.subplot(224)
plt.title("Digital Signal")
plt.stem(
    np.arange(resampled_signal.size) * time_interval / target_sr,
    (resampled_signal * scale_value).astype("int") / scale_value,
    basefmt=" ",
)
plt.show()

# Sampling theorem

In [None]:
@interact(
    t0=widgets.FloatSlider(
        value=1, min=0.1, max=4, layout=Layout(width="960px"), description=r"$$t_0$$:"
    ),
    f0=widgets.FloatSlider(
        value=30, min=1, max=1000, layout=Layout(width="960px"), description=r"$$f_0$$:"
    ),
    dt=widgets.FloatSlider(
        value=0.4,
        min=0.01,
        step=0.01,
        max=1,
        layout=Layout(width="960px"),
        description=r"$$\Delta t$$:",
    ),
)
def guassian_pulse_discretization(t0: float, f0: float, dt: float):
    def x_time(t):
        return np.exp(-((t / t0) ** 2)) * np.cos(f0 * t)

    def x_freq(f):
        return np.pi**0.5 * t0 * np.exp(-((np.pi * (f - f0) * t0) ** 2))

    t = np.linspace(-2, 2, num=4 * 8000)
    f = np.linspace(-2.5 * f0, 2.5 * f0, num=4 * 8000)
    x_t = x_time(t)
    x_f = x_freq(f)

    t_steps = np.arange(0, t.max() + dt, step=dt)
    t_steps = np.concatenate([-t_steps[:0:-1], t_steps])
    x_t_steps = x_time(t_steps)
    x_t_reconstructed = sum(
        x_t_step * np.sinc((t - t_step) / dt)
        for t_step, x_t_step in zip(t_steps, x_t_steps)
    )

    plt.figure(figsize=(12, 8))
    plt.subplot(221)
    plt.plot(t, x_t)
    plt.title("original signal")
    plt.xlabel("time")
    plt.subplot(222)
    plt.title("spectrum")
    plt.plot(f, x_f)
    plt.xlabel("frequency")

    plt.subplot(223)
    plt.title("sampled signal points")
    plt.stem(t_steps, x_t_steps)
    plt.xlabel("time")
    plt.subplot(224)
    plt.title("reconstruction")
    plt.plot(t, x_t, label="original")
    plt.plot(t, x_t_reconstructed, "--", label="reconstructed")
    plt.scatter(t_steps, x_t_steps, c="#feffb3", label="sampled")
    plt.legend()
    plt.xlabel("time")
    plt.show()

    ipd.display(ipd.Audio(x_t, rate=8000))

# Frequency Aliasing

In [None]:
t1 = np.linspace(0, 1, 16)
t2 = np.linspace(0, 1, 800)


def sin_n(x: np.ndarray, n: int) -> np.ndarray:
    return np.sin(2 * np.pi * n * x)


plt.figure(figsize=(16, 5))
plt.scatter(t1, sin_n(t1, 1), label="discretization points", color="w", s=50)
plt.plot(t2, sin_n(t2, 1), label=r"$\sin(\omega t)$")
plt.plot(t2, sin_n(t2, 16), label=r"$\sin(16 \omega t)$")
plt.legend(fontsize=12)
plt.show()

# Spectral Filtering

In [None]:
@interact(
    window_type=widgets.ToggleButtons(
        options=["uniform", "gaussian"], disabled=False, description="window type:"
    ),
    window_size=widgets.IntSlider(
        value=3,
        min=3,
        max=51,
        step=2,
        layout=Layout(width="960px"),
        description="window_size:",
    ),
    power=widgets.IntSlider(
        value=10,
        min=2,
        max=20,
        step=2,
        layout=Layout(width="960px"),
        description="power:",
    ),
)
def smoothing_as_low_frequency_filter(window_type: str, window_size: int, power: int):
    t = np.arange(0, 3, step=0.005)
    y = np.exp(-(((t - 1) / 0.05) ** power))

    if window_type == "uniform":
        window = np.ones(window_size)
    else:
        window = scipy.signal.gaussian(M=window_size, std=window_size // 5 + 1)

    window /= window.sum()

    y_smoothed = np.convolve(
        np.pad(y, pad_width=(window.size // 2,), constant_values=(0,)),
        window,
        mode="valid",
    )

    plt.figure(figsize=(16, 5))
    plt.subplot(121)
    plt.title("time domain")
    plt.plot(t, y, label="original")
    plt.plot(t, y_smoothed, label="smoothed")
    plt.legend()

    plt.subplot(122)
    plt.title("frequency domain")
    plt.plot(spectral_power(y), label="original")
    plt.plot(spectral_power(y_smoothed), label="smoothed")
    plt.show()

# Impulse Response

In [None]:
def display_wav(wav: np.ndarray, sr: int, title: Optional[str] = None) -> None:
    ipd.display(ipd.Audio(wav, rate=sr))
    if title:
        plt.title(title)
    plt.plot(np.arange(wav.size) / sr, wav)
    plt.xlabel("seconds")
    plt.show()


music, _ = librosa.load("data/aecs/salut_spk.wav", sr=SR)
display_wav(music, SR, title="original music")

for rir_path in Path("data/rirs").iterdir():

    rir, _ = librosa.load(rir_path, sr=SR)

    room_music = np.convolve(music, rir, mode="valid")

    ipd.display(ipd.Audio(rir, rate=SR))
    ipd.display(ipd.Audio(room_music, rate=SR))

    plt.figure(figsize=(12, 5))
    plt.subplot(121)
    plt.title("Room Impulse Response")
    plt.plot(np.arange(rir.size) / SR, rir)
    plt.xlabel("seconds")
    plt.subplot(122)
    plt.title(f"{rir_path.stem} music")
    plt.plot(np.arange(room_music.size) / SR, room_music)
    plt.xlabel("seconds")
    plt.show()

# Acoustic Echo Cancellation

In [None]:
def lms(
    mic_signal: np.ndarray,
    spk_signal: np.ndarray,
    window_size: int = 2500,
    mu: float = 0.02,
) -> np.ndarray:

    x = np.pad(spk_signal, pad_width=(window_size - 1, 0), constant_values=(0,))
    y = mic_signal[:]
    w = np.zeros(window_size)
    e = np.zeros(x.size - window_size + 1)

    for i in range(x.size - window_size):
        x_i = x[i : i + window_size]
        e[i] = y[i] - np.dot(x_i, w)
        w += mu * e[i] * x_i
    return e

In [None]:
parent_dir = Path("./data/aecs")
signal_names = ("salut", "doubletalk")

for name in signal_names:
    spk_signal, _ = librosa.load(parent_dir / f"{name}_spk.wav", sr=SR)
    mic_signal, _ = librosa.load(parent_dir / f"{name}_mic.wav", sr=SR)

    display_wav(spk_signal, SR, title="spk signal")
    display_wav(mic_signal, SR, title="mic signal")

    lms_result = lms(mic_signal, spk_signal)

    ipd.display(ipd.Audio(lms_result, rate=SR))
    plt.figure(figsize=(12, 4))
    plt.plot(mic_signal, label="mic signal")
    plt.plot(lms_result, label="enhanced signal")
    plt.legend()
    plt.show()

# Bonus

In [None]:
SR = 3000

c = 261
d = 294
e = 329
f = 349
g = 391
gS = 415
a = 440
aS = 455
b = 466
cH = 523
cSH = 554
dH = 587
dSH = 622
eH = 659
fH = 698
fSH = 740
gH = 784
gSH = 830
aH = 880


def beep(frequency: int, duration: float, sr: int = 3000) -> np.ndarray:
    t = np.linspace(0, duration, int(duration * sr))
    return np.sin(2 * np.pi * frequency * t)


def delay(duration: float, sr: int = 3000) -> np.ndarray:
    t = np.linspace(0, duration, int(duration * sr))
    return np.zeros_like(t)

In [None]:
composition = np.hstack(
    (
        beep(a, 0.500, SR),
        beep(a, 0.500, SR),
        beep(a, 0.500, SR),
        beep(f, 0.350, SR),
        beep(cH, 0.150, SR),
        beep(a, 0.500, SR),
        beep(f, 0.350, SR),
        beep(cH, 0.150, SR),
        beep(a, 0.650, SR),
        delay(0.500, SR),
        beep(eH, 0.500, SR),
        beep(eH, 0.500, SR),
        beep(eH, 0.500, SR),
        beep(fH, 0.350, SR),
        beep(cH, 0.150, SR),
        beep(gS, 0.500, SR),
        beep(f, 0.350, SR),
        beep(cH, 0.150, SR),
        beep(a, 0.650, SR),
        beep(aH, 0.500, SR),
        beep(a, 0.300, SR),
        beep(a, 0.150, SR),
        beep(aH, 0.500, SR),
        beep(gSH, 0.325, SR),
        beep(gH, 0.175, SR),
        beep(fSH, 0.125, SR),
        beep(fH, 0.125, SR),
        beep(fSH, 0.250, SR),
        delay(0.325, SR),
        beep(aS, 0.250, SR),
        beep(dSH, 0.500, SR),
        beep(dH, 0.325, SR),
        beep(cSH, 0.175, SR),
        beep(cH, 0.125, SR),
        beep(b, 0.125, SR),
        beep(cH, 0.250, SR),
        delay(0.350, SR),
        beep(f, 0.250, SR),
        beep(gS, 0.500, SR),
        beep(f, 0.350, SR),
        beep(a, 0.125, SR),
        beep(cH, 0.500, SR),
        beep(a, 0.375, SR),
        beep(cH, 0.125, SR),
        beep(eH, 0.650, SR),
    )
)

ipd.Audio(composition, rate=SR)