In [1]:
import torch as T
import silero_vad
import soundfile as sf

In [2]:
T.set_num_threads(1)
vad_model, vad_utils = T.hub.load(repo_or_dir='snakers4/silero-vad', model='silero_vad')
print(f"model size = {sum([p.numel() for p in vad_model.parameters()]) * 4 / 1.024e6} MB")

model size = 1.8070078125 MB


Using cache found in C:\Users\Freja Holthaus/.cache\torch\hub\snakers4_silero-vad_master


In [3]:
test_audio_file = "C:\\datasets\\OpenSpeechAndLanguageResource\\test-clean\\test-clean\\61\\70970\\61-70970-0007.flac"
test_audio = silero_vad.read_audio(test_audio_file)
print(test_audio.shape)

torch.Size([71760])


In [5]:
from device_capture_system import deviceIO

In [None]:
@T.no_grad()
def get_speech_timestamps(
    audio: T.Tensor, 
    model: T.nn.Module,
    window_siwze = 512):

    # sampling_rate: int = 16000,
    # threshold: float = 0.5,
    # min_speech_duration_ms: int = 250,
    # max_speech_duration_s: float = float('inf'),
    # min_silence_duration_ms: int = 100,
    # speech_pad_ms: int = 30,
    # return_seconds: bool = False,
    # visualize_probs: bool = False,
    # progress_tracking_callback: Callable[[float], None] = None,
    # neg_threshold: float = None):

    """
        ! This code is taken from https://github.com/snakers4/silero-vad and changed

        ! sample rate has to be 16000, upsample or downsample the audio to 16000


    Parameters
    ----------
    audio: torch.Tensor, one dimensional
        One dimensional float torch.Tensor, other types are casted to torch if possible

    model: preloaded .jit/.onnx silero VAD model

    threshold: float (default - 0.5)
        Speech threshold. Silero VAD outputs speech probabilities for each audio chunk, probabilities ABOVE this value are considered as SPEECH.
        It is better to tune this parameter for each dataset separately, but "lazy" 0.5 is pretty good for most datasets.

    sampling_rate: int (default - 16000)
        Currently silero VAD models support 8000 and 16000 (or multiply of 16000) sample rates

    min_speech_duration_ms: int (default - 250 milliseconds)
        Final speech chunks shorter min_speech_duration_ms are thrown out

    max_speech_duration_s: int (default -  inf)
        Maximum duration of speech chunks in seconds
        Chunks longer than max_speech_duration_s will be split at the timestamp of the last silence that lasts more than 100ms (if any), to prevent agressive cutting.
        Otherwise, they will be split aggressively just before max_speech_duration_s.

    min_silence_duration_ms: int (default - 100 milliseconds)
        In the end of each speech chunk wait for min_silence_duration_ms before separating it

    speech_pad_ms: int (default - 30 milliseconds)
        Final speech chunks are padded by speech_pad_ms each side

    return_seconds: bool (default - False)
        whether return timestamps in seconds (default - samples)

    visualize_probs: bool (default - False)
        whether draw prob hist or not

    progress_tracking_callback: Callable[[float], None] (default - None)
        callback function taking progress in percents as an argument

    neg_threshold: float (default = threshold - 0.15)
        Negative threshold (noise or exit threshold). If model's current state is SPEECH, values BELOW this value are considered as NON-SPEECH.

    Returns
    ----------
    speeches: list of dicts
        list containing ends and beginnings of speech chunks (samples or seconds based on return_seconds)

    
    """

    assert type(audio) == T.Tensor, "Audio must be a tensor"
    assert audio.dim() == 1, "Audio must be one-dimensional"


    model.reset_states()


    """
        Variable Initialization:

        min_speech_samples, 
        speech_pad_samples, 
        max_speech_samples, 
        min_silence_samples, 
        min_silence_samples_at_max_speech: 
        Speech Probability Calculation:

        speech_probs: An empty list to store the speech probabilities for each audio chunk.
        The for loop iterates over the audio in chunks of window_siwze. Each chunk is padded if necessary and passed to the model to get the speech probability, which is then appended to speech_probs.
        Progress is calculated and sent to a callback function if provided.
        Speech Segmentation:

        triggered, speeches, current_speech: Variables to manage the state of speech detection. triggered indicates if speech is currently being detected. speeches is a list to store detected speech segments. current_speech holds the current speech segment being processed.
        neg_threshold: A threshold for detecting the end of speech, set to a value slightly lower than the main threshold.
        temp_end, prev_end, next_start: Variables to manage potential segment ends and tolerate some silence.
        Processing Speech Probabilities:

        The for loop iterates over speech_probs to detect speech segments based on the thresholds and durations.
        If speech probability exceeds the threshold and triggered is False, a new speech segment starts.
        If the speech segment exceeds max_speech_samples, it is split, and the current segment is saved.
        If speech probability drops below neg_threshold, potential segment ends are managed, and segments are saved if they meet the minimum speech duration.
        Final Adjustments to Speech Segments:

        If there is an ongoing speech segment at the end of the audio, it is finalized and added to speeches.
        The for loop adjusts the start and end times of speech segments to include padding and manage overlaps.
        Return Format Adjustments:

        If return_seconds is True, the start and end times of speech segments are converted from samples to seconds.
        If step is greater than 1, the start and end times are multiplied by step.
        Visualization:

        If visualize_probs is True, a visualization of the speech probabilities is created.
        Return:

        The function returns the list of detected speech segments (speeches).
    """

    """
        These variables are calculated based on the sampling rate and various duration parameters. 
        They represent the minimum number of samples for speech, 
        padding samples, maximum number of samples for speech, 
        minimum number of samples for silence, 
        and minimum silence samples at maximum speech, 
        respectively.
    """
    min_speech_samples = sampling_rate * min_speech_duration_ms / 1000
    speech_pad_samples = sampling_rate * speech_pad_ms / 1000
    max_speech_samples = sampling_rate * max_speech_duration_s - window_siwze - 2 * speech_pad_samples
    min_silence_samples = sampling_rate * min_silence_duration_ms / 1000
    min_silence_samples_at_max_speech = sampling_rate * 98 / 1000

    audio_length_samples = len(audio)

    speech_probs = []
    for current_start_sample in range(0, audio_length_samples, window_siwze):
        chunk = audio[current_start_sample: current_start_sample + window_siwze]
        if len(chunk) < window_siwze:
            chunk = torch.nn.functional.pad(chunk, (0, int(window_siwze - len(chunk))))
        speech_prob = model(chunk, sampling_rate).item()
        speech_probs.append(speech_prob)
        # caculate progress and seng it to callback function
        progress = current_start_sample + window_siwze
        if progress > audio_length_samples:
            progress = audio_length_samples
        progress_percent = (progress / audio_length_samples) * 100
        if progress_tracking_callback:
            progress_tracking_callback(progress_percent)

    triggered = False
    speeches = []
    current_speech = {}

    if neg_threshold is None:
        neg_threshold = max(threshold - 0.15, 0.01)
    temp_end = 0  # to save potential segment end (and tolerate some silence)
    prev_end = next_start = 0  # to save potential segment limits in case of maximum segment size reached

    for i, speech_prob in enumerate(speech_probs):
        if (speech_prob >= threshold) and temp_end:
            temp_end = 0
            if next_start < prev_end:
                next_start = window_siwze * i

        if (speech_prob >= threshold) and not triggered:
            triggered = True
            current_speech['start'] = window_siwze * i
            continue

        if triggered and (window_siwze * i) - current_speech['start'] > max_speech_samples:
            if prev_end:
                current_speech['end'] = prev_end
                speeches.append(current_speech)
                current_speech = {}
                if next_start < prev_end:  # previously reached silence (< neg_thres) and is still not speech (< thres)
                    triggered = False
                else:
                    current_speech['start'] = next_start
                prev_end = next_start = temp_end = 0
            else:
                current_speech['end'] = window_siwze * i
                speeches.append(current_speech)
                current_speech = {}
                prev_end = next_start = temp_end = 0
                triggered = False
                continue

        if (speech_prob < neg_threshold) and triggered:
            if not temp_end:
                temp_end = window_siwze * i
            if ((window_siwze * i) - temp_end) > min_silence_samples_at_max_speech:  # condition to avoid cutting in very short silence
                prev_end = temp_end
            if (window_siwze * i) - temp_end < min_silence_samples:
                continue
            else:
                current_speech['end'] = temp_end
                if (current_speech['end'] - current_speech['start']) > min_speech_samples:
                    speeches.append(current_speech)
                current_speech = {}
                prev_end = next_start = temp_end = 0
                triggered = False
                continue

    if current_speech and (audio_length_samples - current_speech['start']) > min_speech_samples:
        current_speech['end'] = audio_length_samples
        speeches.append(current_speech)

    for i, speech in enumerate(speeches):
        if i == 0:
            speech['start'] = int(max(0, speech['start'] - speech_pad_samples))
        if i != len(speeches) - 1:
            silence_duration = speeches[i+1]['start'] - speech['end']
            if silence_duration < 2 * speech_pad_samples:
                speech['end'] += int(silence_duration // 2)
                speeches[i+1]['start'] = int(max(0, speeches[i+1]['start'] - silence_duration // 2))
            else:
                speech['end'] = int(min(audio_length_samples, speech['end'] + speech_pad_samples))
                speeches[i+1]['start'] = int(max(0, speeches[i+1]['start'] - speech_pad_samples))
        else:
            speech['end'] = int(min(audio_length_samples, speech['end'] + speech_pad_samples))

    if return_seconds:
        audio_length_seconds = audio_length_samples / sampling_rate
        for speech_dict in speeches:
            speech_dict['start'] = max(round(speech_dict['start'] / sampling_rate, 1), 0)
            speech_dict['end'] = min(round(speech_dict['end'] / sampling_rate, 1), audio_length_seconds)
    elif step > 1:
        for speech_dict in speeches:
            speech_dict['start'] *= step
            speech_dict['end'] *= step

    if visualize_probs:
        make_visualization(speech_probs, window_siwze / sampling_rate)

    return speeches


NameError: name 'Callable' is not defined