# Imports

In [None]:
# AI imports
from pyannote.audio import Pipeline
import torch
import whisper
import noisereduce as nr 

# data processing imports
import pydub
import numpy as np
from scipy.io import wavfile
import os
import pathlib
import shutil 
import json

In [None]:
# import API keys
with open('api_keys.json') as json_file:
    api_keys = json.load(json_file)

# Import Audio File and Convert to WAV

In [None]:
def get_path_name_ext(filename):
    '''
    Given a full filename, returns the path, stem (name), and extension.

    Inputs:
    -------
    filename : string
        The full filename of the file.

    Returns:
    --------
    path : string
        Parent directory of the file
    
    name : string
        The name of the file, excluding the extension. Does not include the parent directory.

    extension : string
        The file extension.
    '''
    path = os.path.dirname(filename)
    name = pathlib.Path(filename).stem
    extension = pathlib.Path(filename).suffix.replace('.', '')

    return path, name, extension

def import_audio(filename):
    ''' 
    Given a filename of an audio file, creates an output folder and converts a copy to .wav format.

    Inputs:
    -------
    filename : string
        The full filename of the file.
    
    Returns:
    --------
    output_filename : string
        Full path to the copy in the output folder.

    '''
    # get file name and extension
    path, name, extension = get_path_name_ext(filename)

    # create an output path for the file if necessary
    output_path = os.path.join('output', name)
    if not os.path.exists(output_path):
        os.makedirs(output_path)

    # final filename
    output_filename = os.path.join(output_path, f'{name}.wav')

    # convert to .wav file if necessary and save in the output folder
    sound = pydub.AudioSegment.from_file(filename, format=extension)
    sound.export(output_filename, format='wav')

    return output_filename

original_filename = os.path.join('input', 'Conversation_Grace_Alex_2.m4a')
output_filename = import_audio(original_filename)

# Denoise Audio File

In [None]:
def denoise_audio(filename):
    ''' 
    Given a .wav filename, removes background noise from the audio and saves a denoised copy.

    Inputs:
    -------
    filename : string
        The full filename of the file.

    Returns:
    --------
    output_filename : string
        Full path to the denoised copy in the output folder.
    '''
    # get file name and extension
    path, name, extension = get_path_name_ext(filename)

    # import the file and reduce the noise
    rate, data = wavfile.read(filename)
    reduced_noise = nr.reduce_noise(y=data, sr=rate, stationary=False, prop_decrease=0.65)

    # output the data
    output_name = f'{name}_DENOISED.wav'
    output_filename = os.path.join(path, output_name)
    wavfile.write(output_filename, rate, reduced_noise)

    return output_filename

output_filename = denoise_audio(output_filename)

# Perform Speaker Diarization

In [None]:
# create function to import diarization model and perform speaker diarization
def diarize_audio(filename, api_key, n_speakers=None):
    ''' 
    Given a .wav filename, performs speaker diarization.

    Inputs:
    -------
    filename : string
        The full filename of the file.

    api_key : string
        Your HuggingFace API key.

    n_speakers : int
        Number of speakers in the audio clip. If None, the algorithm will attempt to guess the number of speakers.

    Returns:
    --------
    turn_list : list[list[string, float, float]]
        A list of turns taken by speakers in the audio. Each turn is a list that contains the speaker label, beginning in seconds, 
        and end in seconds.
    '''
    # import the speaker diarization pipeline
    diarization_model = Pipeline.from_pretrained(
        'pyannote/speaker-diarization-3.1',
        use_auth_token=api_key
    )
    diarization_model.to(torch.device("cuda"))

    # implement speaker diarization model
    diarization_results = diarization_model(filename, num_speakers=n_speakers)

    # create turn list
    turn_list = []
    for turn, _, speaker in diarization_results.itertracks(yield_label=True):
        turn_list.append([speaker, turn.start, turn.end])

    return turn_list

turn_list = diarize_audio(output_filename, api_keys['pyannote'], n_speakers=2)

# Post-Process the Speaker Turns

In [None]:
# function to post-process diarization results (remove short clips, limit overlaps, join consecutive turns)
def consolidate_turns(turn_list, removal_threshold=0.10, max_overlap=1.0):
    ''' 
    Given a turn list, filters out short turns, joins consecutive turns, and minimizes turn overlap between speakers.

    Inputs:
    -------
    turn_list : list[list[string, float, float]]
        A list of turns taken by speakers in the audio. Each turn is a list that contains the speaker label, beginning in seconds, 
        and end in seconds.

    removal_threshold : float
        Minimum threshold for turn length in seconds. If a turn is less than this length, it is removed.

    max_overlap : float
        Maximum overlap of turns in seconds. If two turns overlap more than this, the beginning of the latter turn is truncated
        to the maximum overlap value specifed.

    Returns:
    --------
    consolidated_turn_list : list[list]
        A list of turns taken by speakers in the audio, post-consolidation. Each turn is a list that contains the speaker label, 
        beginning in seconds, and end in seconds.
    '''
    # iterate through the filtering, consolidating, and overlap removal steps until convergence
    prev_turn_list_length = -1
    while len(turn_list) != prev_turn_list_length:
        # reset the first pass flag and the previous turn list length
        prev_turn_list_length = len(turn_list)

        # remove any turns that are less than the threshold
        filtered_turn_list = []
        for t in turn_list:
            if (t[2] - t[1]) >= removal_threshold:
                filtered_turn_list.append(t)

        # join consecutive turns
        consolidated_turn_list = []
        current_turn = None
        for t in filtered_turn_list:
            if current_turn is None:
                current_turn = [t[0], t[1], t[2]]
            elif t[0] == current_turn[0]:
                current_turn[2] = t[2]
            else:
                consolidated_turn_list.append(current_turn)
                current_turn = [t[0], t[1], t[2]]

        consolidated_turn_list.append(current_turn)

        # limiting overlap threshold
        prev_end_time = 0
        for t in consolidated_turn_list:
            t[1] = max(prev_end_time - max_overlap, t[1])
            prev_end_time = t[2]

        # resetting the turn list with the consolidated turn list
        turn_list = consolidated_turn_list

    return consolidated_turn_list

consolidated_turns = consolidate_turns(turn_list)
print(consolidated_turns)

# Clip Turns from the Main Audio

In [None]:
def clip_audio_segments(filename, turn_list):
    ''' 
    Given a filename of the full conversation and a turn list, creates a series of audio clips in .wav format within an output
    sub-directory where each audio clip contains a speaking turn.

    Inputs:
    -------
    filename : string
        Full filename of the .wav audio file to be clipped.

    turn_list : list[list[string, float, float]]
        A list of turns taken by speakers in the audio. Each turn is a list that contains the speaker label, beginning in seconds, 
        and end in seconds.

    Returns:
    --------
    segment_output_path : string
        The directory of the sub-directory containing all of the clipped audio segments.
    '''
    # get file name and extension
    path, name, extension = get_path_name_ext(filename)

    # create output directory for all segments, and remove existing if necessary
    segment_output_path = os.path.join(path, 'diarization_segments')
    if os.path.exists(segment_output_path):
        shutil.rmtree(segment_output_path)
        
    os.makedirs(segment_output_path)
    
    # Open the .wav file
    audio = pydub.AudioSegment.from_file(filename)

    # Define start and end times in seconds
    segment_filenames = []
    for i, t in enumerate(turn_list):
        start_time = t[1]
        end_time = t[2]

        # Extract the segment
        segment = audio[start_time * 1000: end_time * 1000]

        # Write the segment to a new .wav file
        current_segment_filename = os.path.join(segment_output_path, f'segment_{str(i).zfill(6)}.wav')
        segment_filenames.append(current_segment_filename)
        segment.export(current_segment_filename, format='wav')

    return segment_output_path

segment_output_path = clip_audio_segments(output_filename, consolidated_turns)

# Create Transcript of Conversation

In [None]:
# function to import transcription model and transcribe the diarized segments
def transcribe_segments(segment_output_path, turn_list):
    ''' 
    Given the directory containing the audio clips of individual turns and a corresponding turn list, the individual
    audio clips are transcribed with the Whisper model and a transcript of the full conversation is saved to the output folder.

    Inputs:
    -------
    segment_output_path : string
        Directory that contains the individual audio clips in .wav format.

    turn_list : list[list[string, float, float]]
        A list of turns taken by speakers in the audio. Each turn is a list that contains the speaker label, beginning in seconds, 
        and end in seconds.

    Returns:
    --------
    transcript : string
        The full transcript of the conversation.
    '''
    # import whisper model
    whisper_model = whisper.load_model('base', device='cuda')

    # get parent directory
    output_path = os.path.dirname(segment_output_path)

    # get list of segment filenames
    segment_list = os.listdir(segment_output_path)

    # transcribe each segment
    segment_text_list = []
    for segment in segment_list:
        segment_filename = os.path.join(segment_output_path, segment)
        result = whisper_model.transcribe(segment_filename)
        segment_text_list.append(result["text"])
        print(segment)

    # create transcript with speaker labels
    transcript = ''
    for turn, segment_text in zip(turn_list, segment_text_list):
        transcript += f'{turn[0]} : {segment_text}\n'

    # write transcript to file
    transcript_filename = os.path.join(output_path, 'transcript.txt')
    with open(transcript_filename, 'w') as text_filename:
        print(transcript, file=text_filename)

    return transcript

transcript = transcribe_segments(segment_output_path, consolidated_turns)