# パッケージインストール

In [1]:
!pip install -q onnx onnxruntime

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.0/16.0 MB[0m [31m24.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.0/16.0 MB[0m [31m30.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.0/46.0 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.8/86.8 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25h

# 重み、サンプル音声ダウンロード

In [2]:
!wget https://github.com/snakers4/silero-vad/raw/refs/heads/master/src/silero_vad/data/silero_vad.onnx -q
!wget https://models.silero.ai/vad_models/en.wav -O en_example.wav -q

# ONNX取り扱い用クラス、その他ユーティリティ

In [3]:
import warnings
import numpy as np
import onnxruntime

class OnnxWrapper:
    def __init__(self, path, force_onnx_cpu=False):
        opts = onnxruntime.SessionOptions()
        opts.inter_op_num_threads = 1
        opts.intra_op_num_threads = 1

        if force_onnx_cpu and 'CPUExecutionProvider' in onnxruntime.get_available_providers():
            self.session = onnxruntime.InferenceSession(path, providers=['CPUExecutionProvider'], sess_options=opts)
        else:
            self.session = onnxruntime.InferenceSession(path, sess_options=opts)

        self.reset_states()
        if '16k' in path:
            warnings.warn('This model supports only 16000 sampling rate!')
            self.sample_rates = [16000]
        else:
            self.sample_rates = [8000, 16000]

    def _validate_input(self, x: np.ndarray, sr: int) -> tuple[np.ndarray, int]:
        if x.ndim == 1:
            x = np.expand_dims(x, axis=0)
        if x.ndim > 2:
            raise ValueError(f"Too many dimensions for input audio chunk: {x.ndim}")

        if sr != 16000 and (sr % 16000 == 0):
            step = sr // 16000
            x = x[:, ::step]
            sr = 16000

        if sr not in self.sample_rates:
            raise ValueError(f"Supported sampling rates: {self.sample_rates} (or multiple of 16000)")
        if sr / x.shape[1] > 31.25:
            raise ValueError("Input audio chunk is too short")

        return x, sr

    def reset_states(self, batch_size: int = 1):
        self._state = np.zeros((2, batch_size, 128), dtype=np.float32)
        self._context = None
        self._last_sr = 0
        self._last_batch_size = 0

    def __call__(self, x: np.ndarray, sr: int) -> np.ndarray:
        x, sr = self._validate_input(x, sr)
        num_samples = 512 if sr == 16000 else 256

        if x.shape[-1] != num_samples:
            raise ValueError(f"Provided number of samples is {x.shape[-1]} (Supported: 256 for 8kHz, 512 for 16kHz)")

        batch_size = x.shape[0]
        context_size = 64 if sr == 16000 else 32

        if self._last_sr != sr or self._last_batch_size != batch_size or self._last_batch_size == 0:
            self.reset_states(batch_size)

        if self._context is None:
            self._context = np.zeros((batch_size, context_size), dtype=np.float32)

        x = np.concatenate([self._context, x], axis=1)

        ort_inputs = {
            'input': x.astype(np.float32),
            'state': self._state.astype(np.float32),
            'sr': np.array(sr, dtype=np.int64)
        }

        out, new_state = self.session.run(None, ort_inputs)
        self._state = new_state
        self._context = x[:, -context_size:]
        self._last_sr = sr
        self._last_batch_size = batch_size

        return out

    def audio_forward(self, x: np.ndarray, sr: int) -> np.ndarray:
        outs = []
        x, sr = self._validate_input(x, sr)
        self.reset_states()
        num_samples = 512 if sr == 16000 else 256

        if x.shape[1] % num_samples != 0:
            pad_num = num_samples - (x.shape[1] % num_samples)
            x = np.pad(x, ((0, 0), (0, pad_num)), mode='constant')

        for i in range(0, x.shape[1], num_samples):
            wavs_batch = x[:, i:i + num_samples]
            out_chunk = self.__call__(wavs_batch, sr)
            outs.append(out_chunk)

        return np.concatenate(outs, axis=1)

In [4]:
import numpy as np
from typing import Callable, List

class VADIterator:
    def __init__(self,
                 model,
                 threshold: float = 0.5,
                 sampling_rate: int = 16000,
                 min_silence_duration_ms: int = 100,
                 speech_pad_ms: int = 30):
        self.model = model
        self.threshold = threshold
        self.sampling_rate = sampling_rate

        if sampling_rate not in [8000, 16000]:
            raise ValueError('VADIterator supports only 8000 and 16000 Hz')

        self.min_silence_samples = int(sampling_rate * min_silence_duration_ms / 1000)
        self.speech_pad_samples = int(sampling_rate * speech_pad_ms / 1000)
        self.reset_states()

    def reset_states(self):
        self.model.reset_states()
        self.triggered = False
        self.temp_end = 0
        self.current_sample = 0

    def __call__(self, x: np.ndarray, return_seconds=False, time_resolution: int = 1):
        """
        x: np.ndarray, shape [1, N] or [N]
            Audio chunk
        return_seconds: bool
            If True, output start/end in seconds instead of samples
        time_resolution: int
            Decimal places for seconds
        """
        if not isinstance(x, np.ndarray):
            raise TypeError("Input audio must be a NumPy array")

        if x.ndim == 1:
            x = np.expand_dims(x, axis=0)
        elif x.ndim != 2:
            raise ValueError("Input must be 1D or 2D NumPy array")

        window_size_samples = x.shape[1]
        self.current_sample += window_size_samples

        speech_prob = self.model(x, self.sampling_rate).item()

        if speech_prob >= self.threshold and self.temp_end:
            self.temp_end = 0

        if speech_prob >= self.threshold and not self.triggered:
            self.triggered = True
            speech_start = max(0, self.current_sample - self.speech_pad_samples - window_size_samples)
            return {'start': int(speech_start) if not return_seconds else round(speech_start / self.sampling_rate, time_resolution)}

        if speech_prob < self.threshold - 0.15 and self.triggered:
            if not self.temp_end:
                self.temp_end = self.current_sample
            if self.current_sample - self.temp_end < self.min_silence_samples:
                return None
            else:
                speech_end = self.temp_end + self.speech_pad_samples - window_size_samples
                self.temp_end = 0
                self.triggered = False
                return {'end': int(speech_end) if not return_seconds else round(speech_end / self.sampling_rate, time_resolution)}

        return None

# モデル読み込み

In [5]:
model = OnnxWrapper("silero_vad.onnx", force_onnx_cpu=True)

# VAD

In [6]:
import soundfile as sf
from scipy.signal import resample

# サンプル音声読み込み
wav, sr = sf.read("en_example.wav")
if wav.ndim > 1:
    wav = np.mean(wav, axis=1)
wav = wav.astype(np.float32)
if sr != 16000:
    num_samples = int(len(wav) * 16000 / sr)
    wav = resample(wav, num_samples)

In [7]:
vad_iterator = VADIterator(model, sampling_rate=16000)
window_size_samples = 512

# VAD
for i in range(0, len(wav), window_size_samples):
    chunk = wav[i: i+ window_size_samples]
    if len(chunk) < window_size_samples:
        break
    speech_dict = vad_iterator(chunk, return_seconds=True)
    if speech_dict:
        print(speech_dict, end=' ')
vad_iterator.reset_states()

{'start': 0.0} {'end': 2.1} {'start': 2.7} {'end': 4.9} {'start': 5.0} {'end': 6.8} {'start': 9.3} {'end': 13.4} {'start': 13.5} {'end': 15.2} {'start': 15.3} {'end': 15.8} {'start': 16.3} {'end': 17.9} {'start': 18.4} {'end': 19.6} {'start': 20.3} {'end': 37.6} {'start': 38.0} {'end': 38.9} {'start': 39.9} {'end': 43.3} {'start': 43.6} {'end': 44.6} {'start': 45.0} {'end': 46.8} {'start': 48.8} {'end': 50.0} {'start': 51.1} {'end': 53.4} {'start': 53.5} 