In [None]:
!pip install pyrubberband
!pip install torch-time-stretch

Collecting pyrubberband
  Downloading pyrubberband-0.3.0.tar.gz (4.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pysoundfile>=0.8.0 (from pyrubberband)
  Downloading PySoundFile-0.9.0.post1-py2.py3-none-any.whl (24 kB)
Building wheels for collected packages: pyrubberband
  Building wheel for pyrubberband (setup.py) ... [?25l[?25hdone
  Created wheel for pyrubberband: filename=pyrubberband-0.3.0-py3-none-any.whl size=4264 sha256=5f243b91968bfaae3569b42639f61440f513950f33a6f56ac177934f02e2c65f
  Stored in directory: /root/.cache/pip/wheels/b8/2d/f0/bb68fbfe67a42c858a79412321d28589218cbfe114c48ce664
Successfully built pyrubberband
Installing collected packages: pysoundfile, pyrubberband
Successfully installed pyrubberband-0.3.0 pysoundfile-0.9.0.post1
Collecting torch-time-stretch
  Downloading torch_time_stretch-1.0.3-py3-none-any.whl (4.6 kB)
Collecting primePy>=1.3 (from torch-time-stretch)
  Downloading primePy-1.3-py3-none-any.whl (4.0 kB)
Collecting nvidia-

In [None]:
import os
from scipy import signal
import pyrubberband as pyrb
import torch
import torchaudio
import torchaudio.transforms as T
from torch_time_stretch import time_stretch
import numpy as np
# import shutil
import tempfile
import subprocess
import soundfile as sf

def read_audio(filepath, fs=16000,  mono=True, normalize=False, preemphasis=False):
    """
    Reads audio file stored at <filepath>
    Parameters:
        filepath (str): audio file path
        fs (int, optional): samping rate
        mono (boolean, optional): return single channel
        normalize(boolean, optional): peak normalization of signal
        preemphasis (boolean, optional): apply pre-emphasis filter
    Returns:
        waveform (tensor): audio signal, dim(N,)
    """
    assert isinstance(filepath, str), "filepath must be specified as string"
    assert os.path.exists(filepath), f"{filepath} does not exist."

    try:
        waveform, sr = torchaudio.load(filepath)
        # mono channel
        if waveform.shape[0] == 2 and mono is True: waveform = waveform[0]
        else: waveform = waveform.reshape(-1)
        # preemphasis
        if preemphasis:
            waveform = pre_emphasis(waveform)
        # resample
        if sr != fs:
            resampler = T.Resample(sr, fs, dtype=waveform.dtype)
            waveform = resampler(waveform)
        # normalize
        if normalize:
            waveform = rms_normalize(waveform)
        return waveform
    except Exception as e:
        return None


def peak_normalize(waveform):
    """
    Peak normalizes the <waveform>
    Parameter:
        waveform (tensor): waveform, dims: (N,)
    """
    return waveform/torch.max(torch.abs(waveform))


def rms_normalize(waveform, r=-10):
    """
    RMS-normalization of  <waveform>
    Parameter:
        waveform (tensor): waveform, dims: (N,)
        rms (float): rms in dB
    """
    current_rms = torch.pow(torch.mean(torch.pow(waveform,2)) ,0.5)
    scaling_factor = (10**(r/10))/current_rms
    return waveform*scaling_factor


def pre_emphasis(waveform, coeff=0.97):
    filtered_sig = torch.empty_like(waveform)
    filtered_sig[1:] = waveform[1:] - coeff*waveform[:-1]
    filtered_sig[0] = waveform[0]
    return filtered_sig


def add_time_stretch(audio, fs, stretch_rate):
    """
    Adds time stretch to <clean> audio by <stretch_rate> factor.
    Parameters:
        audio (tensor): waveform, dims: (N,)
        fs (float): audio sample rate
        stretch_rate (float): playback rate
    Returns:
        audio_stretch (tensor): time stretched waveform dims: (N*<stretch_rate>,)

    """
    audio_stretch = time_stretch(audio.unsqueeze(0).unsqueeze(0), 1/stretch_rate, fs)
    # assert len(audio)/stretch_rate == len(audio_stretch), f"stretched audio length mismatch. Expected {len(audio)*stretch_rate}, got {len(audio_stretch)}"
    return audio_stretch.squeeze_()


def add_pitch_shift_rb(y, sr, shift, tmpdir=None):
    """
    Adds pitch shift to <y> audio sampled at <sr> by <shift> semitones. It calls rubberband package directly; does not use pyrubberband package.
    """
    if isinstance(y, np.ndarray) is False:
        y = y.numpy()

    if tmpdir is not None:
        tempfile.tempdir = tmpdir

    # Get the input and output tempfile
    fd, infile = tempfile.mkstemp(suffix='.wav')
    os.close(fd)
    fd, outfile = tempfile.mkstemp(suffix='.wav')
    os.close(fd)

    sf.write(infile, y, sr)
    command = ["rubberband", "-q", "--pitch", str(shift), infile, outfile]
    subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    y_out, _ = sf.read(outfile, always_2d=True, dtype=y.dtype)
    if y.ndim == 1:
        y_out = np.squeeze(y_out)

    os.unlink(infile)
    os.unlink(outfile)
    return torch.from_numpy(y_out).type(torch.float32)


def add_pitch_shift(audio, fs, semitone_shift, use_rb=True):
    """
    Adds pitch shift to <clean> audio by <semitone_shit> semitones.
    Parameters:
        audio (tensor): clean waveform, dims: (N,)
        fs (float): audio sample rate
        semitone_shift (float): semitones (can be between -12 and 12, but keep it low for good sound quality)
    Returns:
        audio_shift: pitch shifted added signal (tensor), dims: (N,)

    """
    if use_rb:
        audio_shift = pyrb.pitch_shift(audio.numpy(), fs, semitone_shift)
        return torch.from_numpy(audio_shift).type(torch.float32)
    else:
        audio_shift = torchaudio.functional.pitch_shift(audio, fs, semitone_shift)
        return audio_shift

def add_noise(audio, noise, snr):
    """
    Adds background <noise> to <clean> signal at desired <SNR> level
    Parameters:
        audio (tensor): clean waveform, dims: (N,)
        noise (tensor): noise waveform, dims: (M,)
        snr (int): SNR level in dB
    Returns:
        noisy_audio: noisy signal (tensor), dims: (N,)
    """
    # make equal lengths for clean and noise signals
    if len(audio) >= len(noise):
        reps = torch.ceil(torch.tensor(len(audio)/len(noise))).int()
        noise = torch.tile(noise, (reps,))[:len(audio)]
    else:
        start_idx = torch.randint(len(noise) - len(audio), (1,))
        noise = noise[start_idx:start_idx+len(audio)]

    assert len(noise) == len(audio), f"noise signal {len(noise)} and clean signal {len(audio)} length mismatch"

    # add noise at desired snr
    audio_rms = torch.mean(torch.pow(audio, 2))
    noise_rms = torch.mean(torch.pow(noise, 2))
    factor = torch.pow((audio_rms/noise_rms)/torch.pow(torch.tensor(10), (snr/10)), 0.5)
    noise = factor*noise
    noisy_audio = audio + noise
    assert 10*torch.log10(audio_rms/torch.mean(torch.pow(noise, 2))) - snr < 1e-4, f"snr mismatch {10*torch.log10(audio_rms/torch.mean(torch.pow(noise, 2))), snr, len(audio), len(noise), audio_rms, torch.mean(torch.pow(noise, 2)), noise_rms, factor, audio, torch.count_nonzero(audio)}"
    return noisy_audio


def add_reverb(clean, rir):
    """
    Filters <clean> signal with <rir> to get reverberation effect
    Parameters:
        clean (tensor): clean waveform, dims: (N,)
        rir (tensor): room impulse response, dims: (M,)
    Returns:
        reverb added signal (tensor), dims: (N,)
    """
    clean = clean.numpy()
    rir = rir.numpy()
    rir = rir/np.linalg.norm(rir)
    # filering
    p_max = np.argmax(np.abs(rir))
    filtered_clean = signal.convolve(clean, rir, mode="full")

    # time offset
    e = np.empty_like(filtered_clean, dtype=np.float32)
    e[-p_max:] = 0.0
    e[:-p_max] = filtered_clean[p_max:]
    # filtered_clean = e.copy()
    # e=None
    filtered_clean = e[:len(clean)]
    assert(len(filtered_clean)==len(clean))
    filtered_clean = torch.from_numpy(filtered_clean)
    return filtered_clean


def add_noise_reverb(audio, noise, snr, rir):
    """
    Adds background <noise> at desired <snr> level and reveberation using <rir> to <clean> signal
    Parameters:
        audio (tensor): clean waveform, dims: (N,)
        noise (tensor): noise waveform, dims: (M,)
        snr (int): SNR level in dB
        rir (tensor): room impulse response, dims: (M,)
    Returns:
        noise and reverb added signal (tensor), dims: (N,)
    """
    audio_reverb = add_reverb(audio, rir)
    noise_reverb = add_reverb(noise, rir)
    noise_reverb_clean = add_noise(audio_reverb, noise_reverb, snr)
    return noise_reverb_clean

In [None]:
import torchaudio
librispeech_test = torchaudio.datasets.LIBRISPEECH(".", url="test-clean", download=True)

100%|██████████| 331M/331M [00:21<00:00, 16.5MB/s]


In [None]:
! pip install fairseq

Collecting fairseq
  Downloading fairseq-0.12.2.tar.gz (9.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.6/9.6 MB[0m [31m39.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Installing backend dependencies ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting hydra-core<1.1,>=1.0.7 (from fairseq)
  Downloading hydra_core-1.0.7-py3-none-any.whl (123 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m123.8/123.8 kB[0m [31m17.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting omegaconf<2.1 (from fairseq)
  Downloading omegaconf-2.0.6-py3-none-any.whl (36 kB)
Collecting sacrebleu>=1.4.12 (from fairseq)
  Downloading sacrebleu-2.4.1-py3-none-any.whl (106 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m106.6/106.6 kB[0m [31m14.8 MB/s[0m eta [36m0:00:00[0m
Collecting bitarray (from fairse

In [None]:
# Check if CUDA (GPU) is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
import torch
import fairseq
import torchaudio
import os

# Load pre-trained model on CPU
cp = torch.load('/content/vq-wav2vec_kmeans.pt', map_location=torch.device('cpu'))
model, cfg, task = fairseq.checkpoint_utils.load_model_ensemble_and_task([cp])
model = model[0]
model.eval()

# Define the path to the audio file
audio_file_path = '/content/LibriSpeech/test-clean/1089/134686/1089-134686-0000.flac'

# Read and preprocess audio
wav_input_16khz = read_audio(audio_file_path, fs=16000, mono=True, normalize=False, preemphasis=False)

# Pass the preprocessed audio through the model
z = model.feature_extractor(wav_input_16khz)
_, idxs = model.vector_quantizer.forward_idx(z)
print(idxs.shape)  # Output: torch.Size([1, 60, 2]), 60 timesteps with 2 indexes corresponding to 2 groups in the model

RuntimeError: unexpected EOF, expected 4236767 more bytes. The file might be corrupted.