# Voice Model

References:

[Speaker Recognition - pyannote-audio]("https://github.com/pyannote/pyannote-audio")

[Building a Speaker Identification System]("https://medium.com/analytics-vidhya/building-a-speaker-identification-system-from-scratch-with-deep-learning-f4c4aa558a56")

[상담사 통화녹음 화자분리]("https://youngseo-computerblog.tistory.com/120")

_

Implementations:

[Faster Whisper]("https://github.com/systran/faster-whisper")

[Audio Embedding - wespeaker-voxceleb-resnet34-LM]("https://huggingface.co/pyannote/wespeaker-voxceleb-resnet34-LM") | [resnet293-LM]("https://huggingface.co/Wespeaker/wespeaker-voxceleb-resnet293-LM/tree/main")

[whisper_streaming]("https://github.com/ufal/whisper_streaming")

[whisper live]("https://github.com/collabora/WhisperLive")

## 0. Streaming

In [11]:
import pyaudio
import wave
import os

In [12]:
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024
RECORD_SECONDS = 2
OUTPUT_DIR = "streaming_audio"
WAVE_OUTPUT_FILENAME = "output"

In [ ]:
audio = pyaudio.PyAudio()

stream = audio.open(format=FORMAT, channels=CHANNELS,
                    rate=RATE, input=True,
                    frames_per_buffer=CHUNK)

print("Recording...")

try:
    i = 0
    while True:
        frames = []

        for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
            data = stream.read(CHUNK)
            frames.append(data)

        wavefile = wave.open(os.path.join(OUTPUT_DIR, f"{WAVE_OUTPUT_FILENAME}_{i}.wav"), 'wb')
        wavefile.setnchannels(CHANNELS)
        wavefile.setsampwidth(audio.get_sample_size(FORMAT))
        wavefile.setframerate(RATE)
        wavefile.writeframes(b''.join(frames))
        wavefile.close()

        print(f"Saved {WAVE_OUTPUT_FILENAME}_{i}.wav")
        i += 1
except KeyboardInterrupt:
    print("Recording stopped by user")

stream.stop_stream()
stream.close()
audio.terminate()

print("Recording finished.")

## 1. Import Libraries

In [1]:
!pip install faster-whisper
!pip install pyannote-audio



In [2]:
import torch
import torchaudio

import numpy as np

from pyannote.audio import Audio
from pyannote.core import Segment

from faster_whisper import WhisperModel

device = "cuda" if torch.cuda.is_available() else "cpu"
  
print("PyTorch:", torch.__version__)
print("TorchAudio:", torchaudio.__version__)
print("Uses Device:", device.upper())

PyTorch: 2.2.2+cu118
TorchAudio: 2.2.2+cu118
Uses Device: CPU


## 2. Whisper

In [3]:
# delete model if low on GPU resources
import gc
try:
    del model
    print("Model Deleted.")
except NameError as e:
    print(e)
gc.collect()
torch.cuda.empty_cache()

name 'model' is not defined


In [4]:
# Hyperparameters
language = None
model_size = "medium"  #@param ['tiny', 'base', 'small', 'medium', 'large', 'large-v2', 'large-v3']
compute_type = "int8"  #@param ['float16', 'int8']

In [5]:
# Run on GPU with selected compute type
model = WhisperModel(model_size, device=device, compute_type=compute_type)

tokenizer.json:   0%|          | 0.00/2.20M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/2.26k [00:00<?, ?B/s]

vocabulary.txt:   0%|          | 0.00/460k [00:00<?, ?B/s]

model.bin:   0%|          | 0.00/1.53G [00:00<?, ?B/s]

In [7]:
# upload audio file
try:
    from google.colab import files
    uploaded = files.upload()
    audio_path = next(iter(uploaded))
except ModuleNotFoundError:
    audio_path = "./sample_conversation/kor/conversation_kor_2_1.wav"
    from os.path import isfile
    assert isfile(audio_path)

if audio_path[-3:] != "wav":
    import subprocess
    subprocess.call(["ffmpeg", "-i", audio_path, "audio.wav", "-y"])
    audio_path = "audio.wav"
audio_path

'./sample_conversation/kor/conversation_kor_2_1.wav'

In [8]:
from IPython.display import Audio as AudioDisplay
audio = Audio()
waveform, sample_rate = audio(audio_path)
AudioDisplay(waveform, rate=sample_rate)

In [9]:
# Transcribe
segments, info = model.transcribe(audio_path, beam_size=5, language=language, word_timestamps=False)

In [10]:
print("Detected language '%s' with probability %f" % (info.language, info.language_probability))

for segment in segments:
    print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))

Detected language 'ko' with probability 0.969776
[0.00s -> 1.00s]  저요?
[1.00s -> 2.00s]  왜요?
[2.00s -> 3.00s]  이번 주 토요일 시간돼?
[3.00s -> 4.00s]  저 당직이요.
[4.00s -> 7.00s]  ICU도 돌아야 해서 이번 주말엔 복권 당첨돼서 찾으러 오라고 해도
[7.00s -> 8.00s]  못 가요.
[8.00s -> 9.00s]  성민이는?
[9.00s -> 10.00s]  아!
[10.00s -> 11.00s]  성민이 시험이다.
[11.00s -> 12.00s]  2차 시험 있지?
[12.00s -> 13.00s]  이번 주 토요일?
[13.00s -> 14.00s]  네.
[14.00s -> 15.00s]  지금 용성민 똥줄이요.
[15.00s -> 16.00s]  근데 왜요 교수님?
[16.00s -> 18.00s]  아니야.


## 3. Speaker Diarization

<img src="https://raw.githubusercontent.com/b-re-w/2024-1_BPL_STalk_Model_Research/main/SpeakerDiarization/res/1_V6kstNiDGG3knzsZ-DcFyw.jpg"/>

<img src="https://raw.githubusercontent.com/b-re-w/2024-1_BPL_STalk_Model_Research/main/SpeakerDiarization/res/1_cGMVhv0dNZTM6gPua4uzAA.jpg"/>

<img src="https://raw.githubusercontent.com/b-re-w/2024-1_BPL_STalk_Model_Research/main/SpeakerDiarization/res/1_yzq0c8tEruvTEf1UlVezSA.jpg"/>

## streaming

In [ ]:
import pyaudio
import wave
import os

In [ ]:
FORMAT = pyaudio.paInt16  # 16-bit resolution
CHANNELS = 1  # 1 channel
RATE = 44100  # 44.1kHz sampling rate
CHUNK = 1024  # 2^10 samples for buffer
RECORD_SECONDS = 5  # 녹음 시간 (초)
OUTPUT_DIR = "speaker"  # 파일을 저장할 디렉터리
NUM = 2

In [ ]:
# PyAudio 객체 생성
audio = pyaudio.PyAudio()

# 녹음 스트림 시작
stream = audio.open(format=FORMAT, channels=CHANNELS,
                    rate=RATE, input=True,
                    frames_per_buffer=CHUNK)

try:
    # 녹음 횟수 입력 받기
    num_recordings = NUM

    # 녹음 수행
    for recording_num in range(num_recordings):
        print(f"Recording {recording_num + 1}/{num_recordings}...")

        frames = []
        for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
            data = stream.read(CHUNK)
            frames.append(data)

        # 파일로 저장 (고유한 이름으로)
        output_filename = f"speaker_{recording_num}.wav"
        wavefile = wave.open(os.path.join(OUTPUT_DIR, output_filename), 'wb')
        wavefile.setnchannels(CHANNELS)
        wavefile.setsampwidth(audio.get_sample_size(FORMAT))
        wavefile.setframerate(RATE)
        wavefile.writeframes(b''.join(frames))
        wavefile.close()

        print(f"Saved {output_filename}")

    print("Recording finished.")

except KeyboardInterrupt:
    print("Recording stopped by user")

# 스트림 닫기
stream.stop_stream()
stream.close()
audio.terminate()


In [None]:
# upload audio file
try:
    from google.colab import files
    uploaded = iter(files.upload())
    speaker1 = next(uploaded)
    speaker2 = next(uploaded)
except ModuleNotFoundError:
    speaker1 = "./speaker/speaker_1.wav"
    speaker2 = "./speaker/speaker_2.wav"
    from os.path import isfile
    assert isfile(speaker1) and isfile(speaker2)

if audio_path[-3:] != "wav":
    import subprocess
    subprocess.call(["ffmpeg", "-i", speaker1, "speaker1.wav", "-y"])
    subprocess.call(["ffmpeg", "-i", speaker1, "speaker2.wav", "-y"])
    speaker1 = "speaker1.wav"
    speaker2 = "speaker2.wav"
speaker1, speaker2

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [ ]:
# # upload audio file
# try:
#     from google.colab import files
#     uploaded = iter(files.upload())
#     speaker1 = next(uploaded)
#     speaker2 = next(uploaded)
# except ModuleNotFoundError:
#     speaker1 = "./sample_conversation/kor/kor_2_1.wav"
#     speaker2 = "./sample_conversation/kor/kor_2_2.wav"
#     from os.path import isfile
#     assert isfile(speaker1) and isfile(speaker2)
# 
# if audio_path[-3:] != "wav":
#     import subprocess
#     subprocess.call(["ffmpeg", "-i", speaker1, "speaker1.wav", "-y"])
#     subprocess.call(["ffmpeg", "-i", speaker1, "speaker2.wav", "-y"])
#     speaker1 = "speaker1.wav"
#     speaker2 = "speaker2.wav"
# speaker1, speaker2

In [None]:
from IPython.display import Audio as AudioDisplay
audio = Audio()
waveform, sample_rate = audio(speaker1)
AudioDisplay(waveform, rate=sample_rate)

In [None]:
from IPython.display import Audio as AudioDisplay
audio = Audio()
waveform, sample_rate = audio(speaker2)
AudioDisplay(waveform, rate=sample_rate)

### Using ResNet - 293

In [None]:
!pip install git+https://github.com/wenet-e2e/wespeaker.git

In [None]:
# instantiate pretrained model
from huggingface_hub import hf_hub_download
import wespeaker

model_list = [
    "Wespeaker/wespeaker-voxceleb-resnet34-LM",
    "Wespeaker/wespeaker-voxceleb-resnet152-LM",
    "Wespeaker/wespeaker-voxceleb-resnet221-LM",
    "Wespeaker/wespeaker-voxceleb-resnet293-LM",
    "Wespeaker/wespeaker-ecapa-tdnn512-LM"
]

model_id = model_list[3]

model_binary = model_id.replace("Wespeaker/wespeaker-", "").replace("-", "_")+".onnx"
root_dir = hf_hub_download(model_id, filename=model_binary).replace(model_binary, "")
hf_hub_download(model_id, filename="avg_model.pt")
hf_hub_download(model_id, filename="config.yaml")
resnet = wespeaker.load_model_local(root_dir)

In [None]:
resnet.register('민서', speaker1)
resnet.register('연우', speaker2)

In [None]:
def extract_embedding(self, pcm, sample_rate):
    if self.apply_vad:
        # TODO(Binbin Zhang): Refine the segments logic, here we just
        # suppose there is only silence at the start/end of the speech
        segments = self.vad.get_speech_timestamps(audio_path, return_seconds=True)
        pcmTotal = torch.Tensor()
        if len(segments) > 0:  # remove all the silence
            for segment in segments:
                start = int(segment['start'] * sample_rate)
                end = int(segment['end'] * sample_rate)
                pcmTemp = pcm[0, start:end]
                pcmTotal = torch.cat([pcmTotal, pcmTemp], 0)
            pcm = pcmTotal.unsqueeze(0)
        else:  # all silence, nospeech
            return None
    pcm = pcm.to(torch.float)
    if sample_rate != self.resample_rate:
        pcm = torchaudio.transforms.Resample(
            orig_freq=sample_rate, new_freq=self.resample_rate)(pcm)
    feats = self.compute_fbank(
        pcm,
        sample_rate=self.resample_rate,
        cmn=True
    )
    feats = feats.unsqueeze(0)
    feats = feats.to(self.device)
    self.model.eval()
    with torch.no_grad():
        outputs = self.model(feats)
        outputs = outputs[-1] if isinstance(outputs, tuple) else outputs
    embedding = outputs[0].to(torch.device('cpu'))
    return embedding

In [None]:
def recognize(self, pcm, sample_rate):
    q = extract_embedding(self, pcm, sample_rate)
    best_score = 0.0
    best_name = ''
    for name, e in self.table.items():
        score = self.cosine_similarity(q, e)
        if best_score < score:
            best_score = score
            best_name = name
    result = {'name': best_name, 'confidence': best_score}
    return result

In [None]:
segments, info = model.transcribe(audio_path, beam_size=5, language=language, word_timestamps=False)
for segment in segments:
    embedding = audio.crop(audio_path, Segment(segment.start, segment.end))
    speaker = recognize(resnet, *embedding)
    print("[%s] [%.2fs -> %.2fs] %s" % (speaker['name'], segment.start, segment.end, segment.text))

## Visualizing Speaker Diarization

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

# Your existing code for clustering and labeling segments...

# Perform PCA to reduce the dimensionality of embeddings to 2D
pca = PCA(n_components=2, random_state=42)
embeddings_2d = pca.fit_transform(embeddings)

# Plot the clusters
plt.figure(figsize=(10, 8))
for i, segment in enumerate(segments):
    speaker_id = labels[i] + 1
    x, y = embeddings_2d[i]
    plt.scatter(x, y, label=f'SPEAKER {speaker_id}')

plt.title("Speaker Diarization Clusters (PCA Visualization)")
plt.xlabel("Principal Component 1")
plt.ylabel("Principal Component 2")
plt.legend()
plt.show()

### Using ResNet - 34

In [None]:
from scipy.spatial.distance import cdist

In [None]:
# instantiate pretrained model
from pyannote.audio import Model
resnet = Model.from_pretrained("pyannote/wespeaker-voxceleb-resnet34-LM")

In [None]:
from pyannote.audio import Inference
inference = Inference(resnet, window="whole")
inference.to(torch.device(device))

In [None]:
embedding1 = np.expand_dims(inference(speaker1), axis=0)
embedding2 = np.expand_dims(inference(speaker2), axis=0)
print(embedding1.shape, embedding2.shape)
# `embeddingX` is (1 x D) numpy array extracted from the file as a whole.

In [None]:
segments, info = model.transcribe(audio_path, beam_size=5, language=language, word_timestamps=False)
for segment in segments:
    embedding = inference.crop(audio_path, Segment(segment.start, segment.end))
    embedding = np.expand_dims(embedding, axis=0)
    distance1 = cdist(embedding, embedding1, metric="cosine")[0, 0]
    distance2 = cdist(embedding, embedding2, metric="cosine")[0, 0]
    speaker = "SPEAKER 1" if distance1 < distance2 else "SPEAKER 2"
    print("[%s] [%.2fs -> %.2fs] %s" % (speaker, segment.start, segment.end, segment.text))

### Using SpeechBrain Model

In [None]:
# If you're going to use SpeechBrain model
!pip install git+https://github.com/speechbrain/speechbrain.git@65c0113

In [None]:
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding

sb_model = PretrainedSpeakerEmbedding(
    embedding="speechbrain/spkrec-ecapa-voxceleb",
    device=torch.device("cuda")
)

In [None]:
# convert audio to mono from
_, sample_rate = audio(speaker1)
mono = Audio(sample_rate, mono="downmix")

In [None]:
# convert each speaker audio to mono
embedding1 = sb_model(mono(speaker1)[0][None])
embedding2 = sb_model(mono(speaker2)[0][None])
print(embedding1.shape, embedding2.shape)
# `embeddingX` is (1 x D) numpy array extracted from the file as a whole.

In [None]:
# calculate cosine distance between two embeddings just for testing
distance = cdist(embedding1, embedding2, metric="cosine")[0,0]
distance
# `distance` is a `float` describing how dissimilar speakers 1 and 2 are.

In [None]:
segments, info = model.transcribe(audio_path, beam_size=5, language=language, word_timestamps=False)
for segment in segments:
    sound = mono.crop(audio_path, Segment(segment.start, segment.end))
    embedding = sb_model(sound[0][None])
    distance1 = cdist(embedding, embedding1, metric="cosine")[0, 0]
    distance2 = cdist(embedding, embedding2, metric="cosine")[0, 0]
    speaker = "SPEAKER 1" if distance1 < distance2 else "SPEAKER 2"
    print("[%s] [%.2fs -> %.2fs] %s" % (speaker, segment.start, segment.end, segment.text))

## Miscellaneous

In [None]:
import wave
import contextlib

In [None]:
with contextlib.closing(wave.open(path, 'r')) as f:
    frames = f.getnframes()
    rate = f.getframerate()
    duration = frames / float(rate)

In [None]:
from whisper_streaming.whisper_online import FasterWhisperASR, OnlineASRProcessor

src_lan = "en"  # source language
#tgt_lan = "en"  # target language  -- same as source for ASR, "en" if translate task is used

asr = FasterWhisperASR(src_lan, "large-v2")  # loads and wraps Whisper model
# set options:
# asr.set_translate_task()  # it will translate from lan into English
# asr.use_vad()  # set using VAD

In [None]:
online = OnlineASRProcessor(asr)  # create processing object with default buffer trimming option

audio_has_not_ended = True

while audio_has_not_ended:   # processing loop:
    a = audio.crop(path, Segment(0.0, 0.2))  # receive new audio chunk (and e.g. wait for min_chunk_size seconds first, ...)
    online.insert_audio_chunk(a)
    o = online.process_iter()
    print(o)  # do something with current partial output
# at the end of this audio processing
o = online.finish()
print(o)   # do something with the last output


online.init()  # refresh if you're going to re-use the object for the next audio