In [10]:
!pip install speechbrain==1.0
!pip install faster_whisper
!pip install pyannote.audio
!pip install whisper



In [11]:
import librosa
import traceback
from faster_whisper import WhisperModel
import torch
import whisper
import datetime
from pathlib import Path
import pandas as pd
import re
import time
import os
import numpy as np
from sklearn.cluster import AgglomerativeClustering
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
from pyannote.audio import Audio
from pyannote.core import Segment
import speechbrain
from scipy.spatial.distance import cdist

In [12]:
audio_file_path = '/Users/hanama/Desktop/AEOS_WORK/labs/Speaker-Diarization/inputs/audios/audio-2.wav'

In [13]:
# Define whisper models and embedding model
whisper_models = ["tiny", "base", "small", "medium", "large-v1", "large-v2"]
embedding_model = PretrainedSpeakerEmbedding(
    "speechbrain/spkrec-ecapa-voxceleb",
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
)

# Helper function to convert seconds to a timestamp
def convert_time(secs):
    return datetime.timedelta(seconds=round(secs))

# Main function to perform speech-to-text and diarization
def speech_to_text(audio_file, whisper_model):
    model = WhisperModel(whisper_model, compute_type="int8")
    
    # Load audio and get duration
    audio_data, sample_rate = librosa.load(audio_file, mono=True, sr=16000)
    duration = len(audio_data) / sample_rate
    
    # Transcribe audio to get segments
    options = dict(language="en", beam_size=5, best_of=5)
    transcribe_options = dict(task="transcribe", **options)
    segments_raw, info = model.transcribe(audio_file, **transcribe_options)
    
    segments = [{'start': s.start, 'end': s.end} for s in segments_raw]
    
    # Function to extract embedding for a single segment
    def segment_embedding(segment):
        audio = Audio()
        start = segment["start"]
        end = min(duration, segment["end"])
        clip = Segment(start, end)
        waveform, _ = audio.crop(audio_file, clip)
        return embedding_model(waveform[None])

    # Extract embeddings for all segments
    embeddings = np.zeros(shape=(len(segments), 192))
    for i, segment in enumerate(segments):
        embeddings[i] = segment_embedding(segment)
    embeddings = np.nan_to_num(embeddings)

    # Perform clustering
    clustering = AgglomerativeClustering(n_clusters=2).fit(embeddings)
    labels = clustering.labels_
    for i in range(len(segments)):
        segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1)
        
    # Format results for output
    objects = {'Start': [], 'End': [], 'Speaker': []}
    for i, segment in enumerate(segments):
        if i == 0 or segments[i-1]["speaker"] != segment["speaker"]:
            objects['Start'].append(str(convert_time(segment["start"])))
            if i > 0:
                objects['End'].append(str(convert_time(segments[i-1]["end"])))
            objects['Speaker'].append(segment["speaker"])
    objects['End'].append(str(convert_time(segments[-1]["end"])))
    
    df_results = pd.DataFrame(objects)
    return df_results

ImportError: 'speechbrain' must be installed to use 'speechbrain/spkrec-ecapa-voxceleb' embeddings. Visit https://speechbrain.github.io for installation instructions.

In [16]:
!pip install moviepy pandas pillow



In [17]:
import pandas as pd
from moviepy.editor import VideoFileClip, ImageClip, CompositeVideoClip
from PIL import Image, ImageDraw, ImageFont

# Load the diarization results and the video
df_results = transcription_results
video_path = '/Users/hanama/Desktop/AEOS_WORK/labs/Speaker-Diarization/inputs/videos/video-2.mp4'
video = VideoFileClip(video_path)

# Function to create an image with text
def create_text_image(text, font_size=70, img_size=(640, 80), bg_color=(0,0,0), text_color=(255,255,255)):
    img = Image.new('RGB', img_size, color=bg_color)
    d = ImageDraw.Draw(img)
    try:
        font = ImageFont.truetype("arial.ttf", font_size)
    except IOError:
        font = ImageFont.load_default()
    text_width, text_height = d.textsize(text, font=font)
    position = ((img_size[0] - text_width) / 2, (img_size[1] - text_height) / 2)
    d.text(position, text, fill=text_color, font=font)
    return img

# Create and overlay text clips for each speaker segment
clips = [video]
for _, row in df_results.iterrows():
    start_time = pd.to_datetime(row['Start']).time()
    end_time = pd.to_datetime(row['End']).time()
    start_seconds = start_time.hour * 3600 + start_time.minute * 60 + start_time.second
    end_seconds = end_time.hour * 3600 + end_time.minute * 60 + end_time.second
    
    text_img = create_text_image(row['Speaker'])
    text_img_path = '/content/temp_text_img.png'
    text_img.save(text_img_path)
    
    txt_clip = (ImageClip(text_img_path)
                .set_position(('center', 'bottom'))
                .set_start(start_seconds)
                .set_duration(end_seconds - start_seconds))
    clips.append(txt_clip)

# Combine clips and save the final video
final_video = CompositeVideoClip(clips)
final_video_path = '/Users/hanama/Desktop/AEOS_WORK/labs/Speaker-Diarization/inputs/videoplayback_label.mp4'
final_video.write_videofile(final_video_path, codec='libx264')

ModuleNotFoundError: No module named 'moviepy.editor'