In [1]:
import ffmpeg
import numpy as np
import wget
import os
from pathlib import Path
from pydub import AudioSegment
import time
import pandas as pd
import soundfile as sf
import whisper
import json
from omegaconf import OmegaConf
from nemo.collections.asr.models import ClusteringDiarizer, NeuralDiarizer
import os
os.environ['PYTHONIOENCODING'] = 'utf-8'

[NeMo W 2024-11-12 15:31:47 transformer_bpe_models:59] Could not import NeMo NLP collection which is required for speech translation model.


FFmpeg is used to extract the audio from the video files

In [2]:
def extract_audio(input_video, output_audio, start_time='00:00:00', duration='00:15:00'):
    # Check if output file exists and remove it
    if os.path.exists(output_audio):
        os.remove(output_audio)

    # Use ffmpeg to extract the audio
    try:
        ffmpeg.input(input_video, ss=start_time, t=duration).output(output_audio, qscale=0, ar=16000, ac=1).run(overwrite_output=True, capture_stdout=True)
    except ffmpeg.Error as e:
        raise e

This function counts the instances of a list of words in the transcription lines. This is used to find the phrase "8 minute conversation", which is used to remove the beginning of the transcription that doesn't involve the participants.

In [3]:
def count_matches(row, words):
    return sum(word in row for word in words)

OpenAI's Whisper model is used to transcribe the audio into text, the transcribe_section function takes a segment of audio and transcribes the segment into text.

In [4]:
import subprocess
model = whisper.load_model("base.en")  # Load the Whisper model in English with the "base.en" configuration

# Function to extract and transcribe a section of audio
def transcribe_section(audio, start_time, end_time, threshold):
    # Extract the section from the audio based on start and end times
    section = audio[start_time*1000:end_time*1000]  # Convert times from seconds to milliseconds
    
    # Export the audio section to a temporary file
    temp_file = "temp_section.wav"  # Temporary file name
    section.export(temp_file, format="wav")  # Save the section as a .wav file for transcription
    
    try:
        # Transcribe the temporary file using Whisper
        result = model.transcribe(temp_file)
        
        # Initialize an empty string to hold the filtered transcription
        filtered_text = ""
        
        # If there are segments in the transcription result, process each segment
        if len(result["segments"]) > 0:
            for segment in result["segments"]:
                # Only include text from segments with a no-speech probability below the threshold
                if segment["no_speech_prob"] < threshold:
                    filtered_text += segment["text"]  # Append the transcribed text to the output
                
    except subprocess.CalledProcessError as e:
        # Handle errors from subprocess (e.g., issues with audio extraction)
        return ''
    except RuntimeError as e:
        # Print a runtime error message and return an empty string if a RuntimeError occurs
        print(f"RuntimeError: Error processing section from {start_time} to {end_time}: {e}")
        return ''
    except Exception as e:
        # Print a general error message for any other exceptions
        print(f"Unexpected error: Error processing section from {start_time} to {end_time}: {e}")
        return ''

    return filtered_text  # Return the filtered transcription text


      checkpoint = torch.load(fp, map_location=device)
    


We only want to process mp4 files, so this function checks to see if the files are mp4 files, or start with "._", which are metadata files generated on macOS which we don't want to process.

The single parameter is used to select a single file for testing purposes.

In [5]:
# Check if a file should be processed
def should_process_file(file_path, existing_files, single):
    file_name = file_path.stem
    if file_name in existing_files or \
       (file_path.suffix not in ['.mp4', '.MP4']) or \
       file_name.startswith('._') or \
       (single is not None and file_name not in single):
        return False
    return True

This section uses NVIDIA's NeMo diarization model to diarize the extracted audio. Diarization is the process of identifying different speakers in audio, which is necessary to produce accurate transcriptions. The hyperparameters were chosen by following the guide found here: https://github.com/NVIDIA/NeMo/blob/main/tutorials/speaker_tasks/Speaker_Diarization_Inference.ipynb

In [6]:
# Diarize the audio to separate speakers
def diarize_audio(output_audio, model_config_url="https://raw.githubusercontent.com/NVIDIA/NeMo/main/examples/speaker_tasks/diarization/conf/inference/diar_infer_telephonic.yaml"):
    # Prepare metadata for NeMo diarization
    meta = {'audio_filepath': output_audio, 'offset': 0, 'duration': None, 'label': 'infer', 'text': '-', 'num_speakers': None, 'rttm_filepath': None, 'uem_filepath': None}
    with open("input_manifest.json", 'w', encoding='utf-8') as fp:
        json.dump(meta, fp)
        fp.write('\n')

    # Load or download diarizer configuration
    model_config_path = "model_data/diar_infer_telephonic.yaml"
    if not os.path.exists(model_config_path):
        os.makedirs("model_data", exist_ok=True)
        model_config_path = wget.download(model_config_url, "model_data")
    
    # Set configuration parameters
    config = OmegaConf.load(model_config_path)
    config.diarizer.manifest_filepath = "input_manifest.json"
    config.diarizer.out_dir = "oracle_vad"
    config.device = "cuda"
    config.num_workers = 0
    
    # Initialize and run the diarization model
    diarizer_model = NeuralDiarizer(cfg=config)
    diarizer_model.diarize()

The output of the diarization is a rttm file which we use to extract the audio segments attributed to each speaker.

The threshold paremeter is used to ignore sections that are shorter than the threshold. The default value is 0.5, which ignores all audio sections that are shorter than 0.5 seconds.

In [7]:
# Extract and aggregate speaker segments
def get_speaker_segments(rttm_path, threshold=0.5):
    columns = ['type', 'file_id', 'channel_id', 'begin_time', 'duration', 'ortho', 'speaker_type', 'speaker_name', 'confidence_score', 'signal_lookahead']
    rttm_df = pd.read_csv(rttm_path, delim_whitespace=True, names=columns, comment='#')

    audio_sections = []
    current_section = None
    for _, row in rttm_df.iterrows():
        begin_time, end_time, speaker = row["begin_time"], row["begin_time"] + row["duration"], row["speaker_name"]
        if row["duration"] > threshold:
            if current_section is None or current_section[2] != speaker or begin_time - current_section[1] >= 1:
                if current_section:
                    audio_sections.append(current_section)
                current_section = [begin_time, end_time, speaker]
            else:
                current_section[1] = end_time
    if current_section:
        audio_sections.append(current_section)

    return audio_sections

With the extracted segments, we can the transcribe each section using Whisper.

Whisper outputs a value that represents the percent chance the section isn't speech. The threshold parameter ignores sections where this chance is above the threshold. By default it is set to 0.9, which ignores all segements with a 90% or higher chance to not be speech.

In [8]:
# Transcribe each speaker section
def transcribe_sections(audio, audio_sections, threshold=0.9):
    transcriptions = []
    first_speech = audio_sections[0][0] if audio_sections else 0

    for start_time, end_time, speaker in audio_sections:
        text = transcribe_section(audio, start_time, end_time, threshold)
        if text:
            transcriptions.append([start_time - first_speech, end_time - first_speech, speaker, text])
    return pd.DataFrame(transcriptions, columns=["Start Time", "End Time", "Speaker", "Transcription"])


Once the transcriptions are complete, we remove the noise at the beginning of the transcription and identify the 2 most prominent speakers.

In [9]:
# Filter and label speakers in transcriptions
def filter_and_label_speakers(df):
    df['match_count'] = df['Transcription'].apply(lambda row: count_matches(row, ["8", "eight", "minute", "conversation", "chat"]))
    index_RA = df.index.get_loc(df['match_count'].idxmax())
    filtered_df = df.iloc[index_RA + 1:][["Start Time", "End Time", "Speaker", "Transcription"]] if df.iloc[index_RA]["Start Time"] / df.iloc[-1]["Start Time"] < 0.5 and df['match_count'].max() > 1 else df

    top_2_speakers = filtered_df['Speaker'].value_counts().nlargest(2).index.tolist()
    replace_map = {top_2_speakers[0]: "Speaker 1", top_2_speakers[1]: "Speaker 2"} if len(top_2_speakers) > 1 else {top_2_speakers[0]: "Speaker 1"}
    return filtered_df.replace({"Speaker": replace_map})

We then save the transcriptions after splitting the 2 most prominent speakers, which allow us to produce both the "dyad" files which include both speakers as well as "single" files which include only 1 speaker. We also include a "full" file which are the unfiltered transcriptions, this allows us to regenerate the "dyad" and "single" files without having to rerun the entire pipeline.

The method paramter selects whether the pipeline should treat the input file as having 2 speakers in the case method == 0, or only 1 speaker in the case method != 1.

In [10]:
# Save transcription results based on method
def save_transcriptions(df, output_directory, file_name, method):
    if method == 0:
        df[(df["Speaker"] == "Speaker 1") | (df["Speaker"] == "Speaker 2")].to_csv(output_directory + "/dyad/" + file_name + '_dyad.txt', sep="|", index=False)
        df[df["Speaker"] == "Speaker 1"].to_csv(output_directory + "/single/" + file_name + '_single_X.txt', sep="|", index=False)
        df[df["Speaker"] == "Speaker 2"].to_csv(output_directory + "/single/" + file_name + '_single_Y.txt', sep="|", index=False)
    else:
        df[df["Speaker"] == "Speaker 1"].to_csv(output_directory + "/single/" + file_name + '_single.txt', sep="|", index=False)
    df.to_csv(output_directory + "/full/" + file_name + '_full.txt', sep="|", index=False)


This function runs the full pipeline on all files in a folder.

In [11]:
# Main function to process videos in a folder
def run_in_folder(input_directory, output_directory, single=None, threshold=0.9, method=None):
    directory_path = Path(input_directory)
    output_directory_path = Path(output_directory)
    existing_files = [file.stem.split("_")[0] for sub_folder in ["dyad", "full", "single"] for file in (output_directory_path / sub_folder).glob("*")]

    for file_path in directory_path.glob("*.mp4"):
        if should_process_file(file_path, existing_files, single):
            print(f"Processing file: {file_path.stem}")
            workflow_start_time = time.time()
            
            # Step 1: Extract Audio
            output_audio = 'temp_extracted_audio.wav'
            extract_audio(file_path, output_audio)

            # Step 2: Diarize
            diarize_audio(output_audio)
            
            # Step 3: Get Speaker Segments
            audio_sections = get_speaker_segments('oracle_vad/pred_rttms/temp_extracted_audio.rttm')
            
            # Step 4: Transcribe Sections
            audio = AudioSegment.from_wav(output_audio)
            df = transcribe_sections(audio, audio_sections, threshold)
            
            # Step 5: Filter and Label Speakers
            filtered_df = filter_and_label_speakers(df)
            
            # Step 6: Save Transcriptions
            save_transcriptions(filtered_df, output_directory, file_path.stem, method)
            
            # Clean up temporary audio file
            os.remove(output_audio) if os.path.exists(output_audio) else None
            print(f"Elapsed time: {round(time.time() - workflow_start_time, 2)} seconds")


This function loops through a list of subfolders to execute the main pipeline.

In [12]:
# Process multiple folders
def run_all_folders(input_directory, output_directory, folders=["VTV", "FTF", "VGC"], methods=[None, None, None], single=None, threshold=0.9):
    for folder, method in zip(folders, methods):
        run_in_folder(input_directory + "/" + folder + "/", output_directory + "/" + folder + "/", single=single, threshold=threshold, method=method)

Here is an example execution of the pipeline.

In [None]:
input_directory = "K:/Study 2 (Fall2023)"

output_directory = "Transcripts/Study 2 (Fall2023)"

run_all_folders(input_directory, output_directory, folders=["VTV", "FTF", "VGC"], methods=[1, 0,0], single=None, threshold=0.9)