<a href="https://colab.research.google.com/github/Shimmer0523/voice-diarizer/blob/main/voice_diarizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [18]:
def is_colab():
    try:
        import google.colab
        return True
    except:
        return False

if is_colab():
    !pip install -q pyannote.audio pydub noisereduce

In [19]:
import os
from pyannote.audio import Pipeline
from pyannote.audio.pipelines.utils.hook import ProgressHook
from pyannote.core import Segment, Annotation
from transformers import AutoFeatureExtractor, AutoModelForAudioXVector
import numpy as np
from pydub import AudioSegment
import noisereduce as nr
import soundfile as sf
from moviepy.editor import VideoFileClip
import torch
import torchaudio

In [20]:
def mp4_to_audio(mp4_file):
    """mp4 to audio

    Args:
        mp4_file (string): file path of mp4 file
        output_audio (number[]): extracted audio data
    """
    video = VideoFileClip(mp4_file)
    audio = video.audio
    file_body = os.path.splitext(os.path.basename(mp4_file))[0]
    audio_file = "audio_" + file_body + ".wav"
    print("export: " + audio_file)
    audio.write_audiofile(audio_file, codec="pcm_s16le")
    audio.close()
    video.close()

    return audio_file


class Section:
    def __init__(self, section_start, section_end, noise_start, noise_end) -> None:
        self.section_start = section_start
        self.section_end = section_end
        self.noise_start = noise_start
        self.noise_end = noise_end


def reduce_noise(waveform: np.ndarray, sample_rate, sections: list[Section]) -> torch.Tensor:
    output = waveform

    for s in sections:
        section_start = int(s.section_start * sample_rate)
        section_end = int(s.section_end * sample_rate)
        noise_start = int(s.noise_start * sample_rate)
        noise_end = int(s.noise_end * sample_rate)

        y = waveform[section_start : section_end]
        y_noise = waveform[noise_start : noise_end]

        output = np.concatenate((
            waveform[:section_start],
            nr.reduce_noise(y=y, y_noise=y_noise, sr=sample_rate, stationary=True),
            waveform[section_end:]
        ))
    return torch.from_numpy(output).unsqueeze(0)

def standarize_pydub(audio: AudioSegment) -> np.ndarray:
    audio.set_channels(1)
    waveform = np.array(audio.get_array_of_samples()).astype(np.float32)
    if audio.sample_width == 2:
        waveform = waveform / 32768.0
    elif audio.sample_width == 4:
        waveform = waveform / 2147483648.0

    return waveform


In [21]:
# MODE = "AUDIO" # AUDIO or MP4
# AUDIO_FILE = "g22.wav"
# SECTIONS = [Section(section_start=0, section_end=-0.1, noise_start=0, noise_end=0.1)]

In [22]:
MODE = "MP4"
MP4_FILE = "bb02.mp4"
SECTIONS = [
        Section(section_start=0, section_end=37.5, noise_start=1, noise_end=3),
        Section(section_start=37.5, section_end=90.0, noise_start=37.5, noise_end=38.0),
        Section(section_start=90.0, section_end=-0.1, noise_start=127, noise_end=129),
    ]


In [23]:
if is_colab():
    from google.colab import userdata
    HUGGING_FACE_TOKEN = userdata.get("HUGGING_FACE_TOKEN")
else:
    from dotenv import load_dotenv
    import os

    load_dotenv()
    HUGGING_FACE_TOKEN = os.getenv("HUGGING_FACE_TOKEN")

In [24]:
pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1", use_auth_token=HUGGING_FACE_TOKEN
)
pipeline.to(torch.device("cuda"))

if MODE == "MP4":
    audio_file = mp4_to_audio(MP4_FILE)
else:
    audio_file = AUDIO_FILE

audio = AudioSegment.from_wav(audio_file).set_channels(1)
waveform = standarize_pydub(audio)
sampling_rate = audio.frame_rate

denoised_waveform = reduce_noise(
    waveform,
    sampling_rate,
    sections=SECTIONS,
)

with ProgressHook() as hook:
    diarization: Annotation = pipeline(
        {"waveform": denoised_waveform, "sample_rate": sampling_rate}, num_speakers=2, hook=hook
    )

for segment, track_name, label in diarization.itertracks(yield_label=True):
    print(f"{segment.start=:.1f}, {segment.end=:.1f}, {track_name=}, {label=}")

remaining_speakers = ["SPEAKER_00", "SPEAKER_01"]

for speaker in remaining_speakers:
    print("open: " + audio_file)
    audio = AudioSegment.from_wav(audio_file)

    for segment, track_name, label in diarization.itertracks(yield_label=True):
        if label != speaker:
            mute_start = segment.start * 1000
            mute_end = segment.end * 1000
            muted_section = AudioSegment.silent(duration=(mute_end - mute_start), frame_rate=sampling_rate)
            audio = audio[:mute_start] + muted_section + audio[mute_end:]
            print(f"mute: {mute_start} - {mute_end}")

    export_file = "muted_" + speaker + "_" + os.path.basename(audio_file)

    print("export: " + export_file)
    audio.export(export_file, format="wav")


export: audio_bb02.wav
MoviePy - Writing audio in audio_bb02.wav


                                                                        

MoviePy - Done.


segment.start=3.4, segment.end=4.2, track_name='A', label='SPEAKER_00'
segment.start=6.1, segment.end=7.7, track_name='B', label='SPEAKER_00'
segment.start=8.7, segment.end=8.8, track_name='C', label='SPEAKER_00'
segment.start=8.8, segment.end=12.0, track_name='D', label='SPEAKER_01'
segment.start=12.6, segment.end=14.0, track_name='E', label='SPEAKER_01'
segment.start=14.3, segment.end=14.4, track_name='F', label='SPEAKER_01'
segment.start=14.6, segment.end=15.3, track_name='G', label='SPEAKER_01'
segment.start=15.6, segment.end=18.3, track_name='H', label='SPEAKER_00'
segment.start=16.0, segment.end=16.2, track_name='I', label='SPEAKER_01'
segment.start=16.6, segment.end=16.8, track_name='J', label='SPEAKER_01'
segment.start=18.7, segment.end=21.0, track_name='K', label='SPEAKER_01'
segment.start=21.1, segment.end=21.2, track_name='L', label='SPEAKER_00'
segment.start=22.3, segment.end=22.3, track_name='M', label='SPEAKER_00'
segment.start=22.3, segment.end=23.5, track_name='N', labe