## This notebook does the entire process

It uses:
- [pytube](https://pytube.io/en/latest/index.html)
- [moviepy](https://pypi.org/project/moviepy/)
- [transformers](https://github.com/huggingface/transformers)
- [pyannote](https://pypi.org/project/pyannote.audio/)
- [onnx-directml](https://pypi.org/project/onnxruntime-genai-directml/)

You will need to download and install microsoft/Phi-3-medium-4k-instruct-onnx-directml and update the model_path below. If you do not have GPU or are not using Windows, see the Phi-3 docs and set yourself up accordingly.

Pyannote.audio is gated on huggingface and requires an account and access key. See https://huggingface.co/pyannote/speaker-diarization-3.1 for instructions.

In [None]:
%pip install pytube
%pip install moviepy
%pip install --upgrade git+https://github.com/huggingface/transformers.git accelerate
%pip install --upgrade pyannote.audio
%pip install numpy
%pip install --pre onnxruntime-genai-directml


In [1]:
hfkey = "YOUR HUGGINGFACE KEY HERE"
model_path = f'./directml/Phi-3-medium-4k-instruct'  # In tests I used medium-4k and medium-128k

## Setup and load openai/whisper-large-v3, pyannote/speaker-diarization-3.1, and Phi-3

In [2]:

from pyannote.audio import Pipeline
from moviepy.editor import VideoFileClip
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
from pytube import YouTube
import os
import shutil
import torch
import json
import re
import onnxruntime_genai as og
import time

device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

model_id = "openai/whisper-large-v3"

model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=False, use_safetensors=True
)
model.to(device)

processor = AutoProcessor.from_pretrained(model_id)

pipe = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    max_new_tokens=128,
    chunk_length_s=30,
    batch_size=16,
    return_timestamps=True,
    torch_dtype=torch_dtype,
    device=device,
)

diarization_pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1",
    use_auth_token=hfkey)

diarization_pipeline.to(torch.device("cuda"))

# Define search options
search_options = {
    'do_sample': True,
    'max_length': 3578,
    'top_p': 0.9,
    'top_k': 5,
    'temperature': 0.5,
    'repetition_penalty': 1.2
}
chat_template = '<|user|>\n{input} <|end|>\n<|assistant|>'

# Initialize model and tokenizer
model = og.Model(model_path)
tokenizer = og.Tokenizer(model)
tokenizer_stream = tokenizer.create_stream()

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
The torchaudio backend is switched to 'soundfile'. Note that 'sox_io' is not supported on Windows.
The torchaudio backend is switched to 'soundfile'. Note that 'sox_io' is not supported on Windows.


## Define all the necessary functions

There are some extras here for processing entire folders instead of individual files.

In [3]:

def download_video(video_url, output_path='./video'):
    yt = YouTube(video_url)
    # Get the highest resolution stream available
    stream = yt.streams.get_highest_resolution()
    # Download the video to the specified folder
    stream.download(output_path) 

def extract_audio(video_file, output_audio_file):
    video = VideoFileClip(video_file)
    audio = video.audio
    # Ensure the audio is in mono by setting nchannels to 1
    audio.write_audiofile(output_audio_file, codec='mp3', ffmpeg_params=["-ac", "1"])
    audio.close()
    video.close()

def process_video_folder(input_folder, output_folder):
    for file in os.listdir(input_folder):
        if file.endswith(".mp4"):
            video_path = os.path.join(input_folder, file)
            audio_path = os.path.join(output_folder, os.path.splitext(file)[0] + '.mp3')
            extract_audio(video_path, audio_path)
            print(f"Processed {file}")

def transcribe_and_save(audio_file_path, output_folder):
    # Process the audio file using the pipe function
    result = pipe(audio_file_path, return_timestamps=True)
    # Extract the base name without the extension and create new file names
    base_name = os.path.basename(os.path.splitext(audio_file_path)[0])
    text_file_name = os.path.join(output_folder, f"{base_name}.txt")
    json_file_name = os.path.join(output_folder, f"{base_name}.json")

    # Write the transcription to a text file with timestamps
    with open(text_file_name, 'w', encoding='utf-8') as text_file:
        for chunk in result["chunks"]:
            # Formatting the timestamp
            start, end = chunk['timestamp']
            timestamp = f"({start}, {end})"
            text_file.write(f"[{timestamp}]: {chunk['text']}\n")

    # Save the transcription as a JSON file
    with open(json_file_name, 'w', encoding='utf-8') as json_file:
        json.dump(result["chunks"], json_file, indent=4)

    print(f"Transcription saved to {text_file_name}")
    print(f"Transcription JSON saved to {json_file_name}")

def process_audio_folder(input_folder, output_folder):
    for file in os.listdir(input_folder):
        if file.endswith(".mp3"):
            audio_file_path = os.path.join(input_folder, file)
            transcribe_and_save(audio_file_path, output_folder)
            print(f"Processed {file}")

def diarize_audio(file_path, output_folder):
    # Check if the file name contains spaces and handle it
    base_name = os.path.splitext(os.path.basename(file_path))[0]
    if ' ' in base_name:
        # Create a temporary file name by removing spaces
        temp_file_path = os.path.join(os.path.dirname(file_path), base_name.replace(' ', '') + os.path.splitext(file_path)[1])
        # Copy the original file to the new file with spaces removed
        shutil.copyfile(file_path, temp_file_path)
        # Use the new file path for processing
        file_path = temp_file_path
    else:
        temp_file_path = None

    # Perform diarization using the provided file path
    diarization = diarization_pipeline(file_path)
    
    # Generate the output filename based on the input file's original name
    output_file = os.path.join(output_folder, f"{base_name}.rttm")
    
    # Write the diarization output to disk using RTTM format
    with open(output_file, "w") as rttm:
        diarization.write_rttm(rttm)

    # Cleanup: if a temporary file was used, delete it
    if temp_file_path:
        os.remove(temp_file_path)

    print(f"Processed {file_path}, output to {output_file}")

def diarize_folder(input_folder, output_folder):
    # List all files in the given folder
    for file_name in os.listdir(input_folder):
        # Check if the file is an MP3 file
        if file_name.endswith(".mp3"):
            file_path = os.path.join(input_folder, file_name)
            diarize_audio(file_path, output_folder)

def read_json(json_file):
    with open(json_file, 'r', encoding='utf-8') as file:
        return json.load(file)

def read_rttm(rttm_file):
    rttm_data = []
    with open(rttm_file, 'r', encoding='utf-8') as file:
        for line in file:
            parts = line.strip().split()
            rttm_data.append({
                "turn_onset": float(parts[3]),
                "duration": float(parts[4]),
                "speaker": parts[7]
            })
    return rttm_data

def seconds_to_hms(seconds):
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = seconds % 60
    return f"{h:02}:{m:02}:{s:06.3f}"

def match_speaker(json_start, json_end, rttm_data):
    best_match = None
    max_overlap = -1
    
    for entry in rttm_data:
        rttm_start = entry["turn_onset"]
        rttm_end = rttm_start + entry["duration"]

        overlap_start = max(json_start, rttm_start)
        overlap_end = min(json_end, rttm_end)
        overlap_duration = max(0, overlap_end - overlap_start)
        
        if overlap_duration > max_overlap:
            max_overlap = overlap_duration
            best_match = entry["speaker"]

    return best_match if best_match else "Unknown"

def process_rttm_file(json_file, rttm_file, output_file):
    json_data = read_json(json_file)
    rttm_data = read_rttm(rttm_file)
    
    with open(output_file, 'w', encoding='utf-8') as text_file:
        previous_end = None  # Track the end time of the previous item
        for i, chunk in enumerate(json_data):
            start = chunk['timestamp'][0]
            if len(chunk['timestamp']) > 1:
                end = chunk['timestamp'][1]
            
            if end == None:
                if i == len(json_data) - 1 and previous_end is not None:  # Last item with no end time
                    end = previous_end + 5  # Assuming a reasonable default duration
                elif i == len(json_data) - 1 and previous_end is None:  # Single item list
                    end = start + 5  # Default duration for single item lists
                else:
                    continue  # If not the last item and no end time, skip
            
            previous_end = end  # Update the previous end time
            speaker = match_speaker(start, end, rttm_data)
            formatted_timestamp = f"({seconds_to_hms(start)}, {seconds_to_hms(end)})"
            text_file.write(f"[{speaker}] : [{formatted_timestamp}] : {chunk['text']}\n")

def process_rttm_folder(folder_path):
    for file_name in os.listdir(folder_path):
        if file_name.endswith(".json"):
            base_name = os.path.splitext(file_name)[0]
            json_file = os.path.join(folder_path, file_name)
            rttm_file = os.path.join(folder_path, base_name + '.rttm')
            output_file = os.path.join(folder_path, base_name + '.transcript.txt')
            
            if os.path.exists(rttm_file):
                process_rttm_file(json_file, rttm_file, output_file)
                print(f"Processed {json_file} and {rttm_file} into {output_file}")
            else:
                print(f"Warning: No RTTM file found for {json_file}. Skipping.")

def list_unique_speakers(file_path):
    # Read the content of the file
    with open(file_path, 'r') as file:
        content = file.read()
    
    # Find all unique speaker identifiers
    speakers = re.findall(r"\[SPEAKER_(\d{2})\]", content)
    unique_speakers = sorted(set(speakers))
    
    # Format speakers in the form "SPEAKER_XX"
    formatted_speakers = [f"SPEAKER_{speaker}" for speaker in unique_speakers]
    return formatted_speakers

def get_context_around_speaker(file_path, speaker):
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    # Find the first occurrence of the speaker
    for i, line in enumerate(lines):
        if speaker in line:
            start_index = max(0, i - 5)  # Ensure start index is within bounds
            end_index = min(len(lines), i + 5)  # Ensure end index is within bounds (i+6 for inclusive slicing)
            return lines[start_index:end_index]
    return []  # Return an empty list if the speaker does not appear

def collect_speaker_contexts(file_path):
    # First, extract unique speakers from the file
    unique_speakers = list_unique_speakers(file_path)
    
    # Now, use the second function to get context for each speaker
    speaker_contexts = {}
    for speaker in unique_speakers:
        # Using the function to fetch context around each speaker's first mention
        context = get_context_around_speaker(file_path, speaker)
        speaker_contexts[speaker] = context
    
    return speaker_contexts     

def process_llm_message(message, search_options):
    # Prepare the prompt and encode it to tokens
    prompt = chat_template.format(input=message)
    input_tokens = tokenizer.encode(prompt)

    # Initialize generator parameters and the generator
    params = og.GeneratorParams(model)
    params.set_search_options(**search_options)
    params.input_ids = input_tokens
    generator = og.Generator(model, params)

    # Initialize output message
    output_message = ''

    # Generate response
    while not generator.is_done():
        generator.compute_logits()
        generator.generate_next_token()
        new_token = generator.get_next_tokens()[0]
        next_output = tokenizer.decode(new_token)
        output_message += next_output

    # Free up resources
    del generator

    return output_message

def identify_speakers_with_llm(file_path):
    
    search_options = {
        'do_sample': True,
        'max_length': 3578,
        'top_p': 0.9,
        'top_k': 5,
        'temperature': 0.5,
        'repetition_penalty': 1.2
    }

    # First, gather the context for each speaker using the previously defined function
    speaker_contexts = collect_speaker_contexts(file_path)
    
    # Iterate through each speaker and their associated context
    responses = {}
    for speaker, context in speaker_contexts.items():
        # Join the context lines into a single string
        context_text = ''.join(context).strip()
        
        # Create the prompt with the current speaker and their context
        prompt = f"""
        The following text is a piece of a transcript. Examine the text and if the text identifies the speaker reply with ONLY the full name matching the exact spelling of the person listed as {speaker} and nothing else. 
        ----------------------------------------
        {context_text}"""

        # Call the existing process_message function with the formatted prompt
        response = process_llm_message(prompt, search_options)
        
        # Store the response for this speaker
        responses[speaker] = response
    
    return responses

def update_speakers_with_real_names(file_path):
    # Assume responses are in the form of speaker names we want to replace in the original file
    response_dict = identify_speakers_with_llm(file_path)
    
    # Read the original file content
    with open(file_path, 'r') as file:
        content = file.readlines()
    
    # Replace speaker labels with actual names based on response_dict
    updated_content = []
    for line in content:
        for speaker, real_name in response_dict.items():
            if speaker in line:
                # Replace the first occurrence of speaker in the line with the real name
                line = line.replace(speaker, real_name.strip(), 1)
        updated_content.append(line)
    
    # Create a new file path with .FINAL before the extension
    base, ext = os.path.splitext(file_path)
    new_file_path = f"{base}.FINAL{ext}"
    
    # Write the updated content to the new file
    with open(new_file_path, 'w') as file:
        file.writelines(updated_content)

    return new_file_path

## Download a video

In [4]:
# First download a video
download_video('https://www.youtube.com/watch?v=MI9DHkyH8Yk','./video')


## Extract the audio

In [5]:
# Next, extract the audio
extract_audio('./video/Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.mp4', './audio/Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.mp3')

MoviePy - Writing audio in ./audio/Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.mp3


                                                                        

MoviePy - Done.




## Use whisper-large-v3 to create a transcript as text and as json

The text is for reference, the json is used later

In [4]:
# Now use whisper-large to create a transcript
transcribe_and_save('./audio/Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.mp3', "./transcript")

## Now use pyannote to diarize the audio and create an RTTM file

In [5]:
# Now use pyannote to diarize the audio
diarize_audio('./audio/Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.mp3', "./transcript")

Processed ./audio\Developer’sGuidetoCustomizingMicrosoftCopilot-MicrososftBuild2024.mp3, output to ./transcript\Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.rttm


## Combine the JSON and RTTM files to create a new transcript with speakers identified as SPEAKER_00, SPEAKER_01, etc 

In [8]:
# Combine the files
process_rttm_file('./Transcript/Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.json', './Transcript/Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.rttm', './Transcript/Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.transcript.txt')

## Finally, look for names using Phi-3 and create the 'FINAL' transcript

In [4]:
# Update the speakers with real names
update_speakers_with_real_names('./Transcript/Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.transcript.txt')

'./Transcript/Developer’s Guide to Customizing Microsoft Copilot - Micrososft Build 2024.transcript.FINAL.txt'