In [1]:
import time
start_time = time.time()

In [None]:
import os
import csv
import subprocess
import time
from tqdm import tqdm
from typing import Tuple

import torch
import whisper
import pandas as pd
import soundfile as sf
import numpy as np

from pydub import AudioSegment
from whisper.audio import pad_or_trim, log_mel_spectrogram, N_FRAMES
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
from pyannote.audio import Pipeline
from openai import AzureOpenAI
from datasets import load_dataset
from pyannote.core import Annotation, Segment
from pyannote.metrics.diarization import DiarizationPurity, DiarizationCoverage


# from config.config import load_config
# from dotenv import load_dotenv

# Load environment variables from .env file
# load_dotenv()
# config = load_config()

Pyannote class

In [None]:
class PyannoteProcessor:
    """
    Class to perform speaker diarization using the Pyannote library.
    """

    def __init__(self):
        self.pipeline = Pipeline.from_pretrained(
           "pyannote/speaker-diarization-3.1",
            use_auth_token="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",
        )

    def perform_diarization(self, audio_file_path):
        self.pipeline.to(torch.device('cuda')) # switch to gpu

        # Hardcoding the number of speakers
        diarization = self.pipeline(audio_file_path, num_speakers=2)

        with open ("sample.rttm", "w") as rttm:
          diarization.write_rttm(rttm)
        

    def rttm_to_dataframe(self, rttm_file_path):
        columns = [
            "Type",
            "File ID",
            "Channel",
            "Start Time",
            "Duration",
            "Orthography",
            "Confidence",
            "Speaker",
            "x",
            "y",
        ]
        data = []

        with open(rttm_file_path, "r") as rttm_file:
            lines = rttm_file.readlines()

        data = [line.strip().split() for line in lines]

        df = pd.DataFrame(data, columns=columns)
        df = df.drop(["x", "y", "Orthography", "Confidence"], axis=1)
        return df


Whisper class

In [None]:
class WhisperProcessor:
    def __init__(self):
        # Initialize the Whisper model
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(self.device)
        self.model = whisper.load_model("large-v2").to(self.device)
        # torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

        # model_id = "openai/whisper-large-v3"

        # model = AutoModelForSpeechSeq2Seq.from_pretrained(
        #     model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
        # )
        # model.to(device)

        # processor = AutoProcessor.from_pretrained(model_id)

        # pipe = pipeline(
        #     "automatic-speech-recognition",
        #     model=model,
        #     tokenizer=processor.tokenizer,
        #     feature_extractor=processor.feature_extractor,
        #     max_new_tokens=128,
        #     chunk_length_s=25,
        #     batch_size=16,
        #     torch_dtype=torch_dtype,
        #     device=device,
        # )


    def transcribe_audio_with_whisper(self, audio_file, detected_language):
        """
        Transcribes an audio segment using the Whisper ASR model.

        Args:
            audio_file (str): Path to the audio file.
            detected_language (str): Detected language of the audio.

        Returns:
            dict: Transcription result containing text and other information.
        """
        result = self.model.transcribe(
            audio_file, language=detected_language, fp16=False, temperature = (0.8, 1.0)
        )
        return result

        # Whisper predicts the language of the source audio automatically. 
        # If the source audio language is known a-priori, it can be passed as an argument to the pipeline:

        # result = pipe(audio_file, generate_kwargs={"language": "english"})
        # return result["text"]

    def detect_audio_language(self, audio) -> Tuple[str, float]:
        """
        Detects the language of an audio segment using the Whisper ASR model.

        Args:
            audio (AudioSegment): Audio segment to detect language from.

        Returns:
            Tuple[str, float]: Detected language and confidence.
        """
        mel_segment = pad_or_trim(log_mel_spectrogram(audio), N_FRAMES).to(
            self.model.device
        )
        _, probs = self.model.detect_language(mel_segment)
        detected_language = max(probs, key=probs.get)
        confidence = probs[detected_language]

        return detected_language, confidence

    def process_audio_segment(
        self, audio_file, start_time, end_time, detected_language
    ):
        """
        Processes an audio segment within a specified time range.

        Args:
            audio_file (str): Path to the audio file.
            start_time (int): Start time of the segment in milliseconds.
            end_time (int): End time of the segment in milliseconds.
            detected_language (str): Detected language of the audio segment.

        Returns:
            str: Collapsed transcript for the processed audio segment.
        """
        start_time = float(start_time * 1000)
        end_time = float(end_time * 1000)

        audio = AudioSegment.from_file(audio_file)
        audio_segment = audio[start_time:end_time]
        
        audio_segment_path = f"audio_segment_{start_time}.wav"
        audio_segment.export(audio_segment_path, format="wav")
        
        # Transcribe the audio segment
        transcription_result = self.transcribe_audio_with_whisper(
            audio_segment_path, detected_language
        )
        whisper_transcript = transcription_result["text"]

        # Split the transcript into segments
        segments = whisper_transcript.split("\n")

        # Collapse the segments
        collapsed_transcript = self.collapse_segments(segments)

        # Delete the temporary audio segment file
        os.remove(audio_segment_path)

        return collapsed_transcript

    def collapse_segments(self, transcript_segments):
        """
        Collapses individual words and spaces in the transcript segments.

        Args:
            transcript_segments (list): List of transcript segments.

        Returns:
            str: Collapsed transcript with words and spaces combined.
        """
        segment_counter = 0
        collapsed_segments = []

        for segment in transcript_segments:
            if segment.startswith("Segment"):
                segment_counter += 1
                collapsed_segments.append(segment)
            else:
                words = segment.split()
                for word in words:
                    collapsed_segments.append(word)
                    collapsed_segments.append(" ")

        collapsed_transcript = "".join(collapsed_segments)
        return collapsed_transcript

ChatGPT apis

In [None]:
client_GPT4 = AzureOpenAI(
    azure_endpoint = "https://dlcru-east-us2.openai.azure.com/",
    api_version = "2023-03-15-preview",
    api_key = "INSERT_YOUR_KEY",
)

model_GPT4 = "dlcru-gpt4"


client_GPT35_turbo = AzureOpenAI(
    azure_endpoint =  "https://gpt3test-dlcru.openai.azure.com/",
    api_version = "2023-03-15-preview",
    api_key =  "INSERT_YOUR_KEY"
)

model_GPT35_turbo = "dlcru-gpt-35-turbo"

LLM class

In [None]:
class LLM:
    def __init__(self, client, model):
        self.client = client
        self.model = model

    def create_prompt(self, text, shot_type):
        prompt_zero_shot = f"""In the speaker diarization transcript below, some words are potentially misplaced.
            Please correct those words and move them to the right speaker.
            Directly show the corrected transcript without explaining what changes were made or why you
            made those changes
            
            "{text}"
            """
        prompt_one_shot = f"""In the speaker diarization transcript below, some words are potentially misplaced. There are only 2 speakers. 
            Please correct those words and move them to the right speaker. For example, given this input transcript,
            <spk:1> How are you doing today? I <spk:2> am doing very well. How was everything at the
            <spk:1> party? Oh, the party? It was awesome. We had lots of fun. Good <spk:2> to hear!
            The correct output transcript should be:
            <spk:1> How are you doing today? <spk:2> I am doing very well. How was everything at the
            party? <spk:1> Oh, the party? It was awesome. We had lots of fun. <spk:2> Good to hear!
            Now, please correct the transcript below.\n
            
             "{text}"
            """
        if shot_type == 'zero_shot':
            return prompt_zero_shot
        elif shot_type == 'one_shot':
            return prompt_one_shot


    def get_completion(self, text, shot_type, temperature=0):
        prompt = self.create_prompt(text, shot_type)
        
        message_objects = [
            {
                "role": "system",
                "content": "You are a helpful assistant. Answer shortly and only what you are asked.",
            },
            {"role": "user", "content": prompt},
        ]

        while True:
            try:
                completion = self.client.chat.completions.create(
                    model=self.model,
                    messages=message_objects,
                    temperature=temperature
                )
                return completion.choices[0].message.content
            except Exception as e:
                if '429' in str(e):  # Check if the error is due to rate limit (429 status code)
                    retry_after = 9  # Retry after 9 seconds as per the error message
                    print(f"Rate limit exceeded. Retrying after {retry_after} seconds.")
                    time.sleep(retry_after)
                else:
                    raise  # Re-raise other exceptions

The "brain"

In [None]:
class AudioProcessor:
    def __init__(self, pyannote_processor, whisper_processor):
        self.pyannote = pyannote_processor
        self.whisper = whisper_processor

    def extract_speakers(self, text):
        labels = []
        speaker_count = {}
        current_label = 1  # Start with label 1
    
        lines = text.strip().split(':')
    
        for line in lines:
            speaker_id = line.split(':')[0].strip()
            if speaker_id not in speaker_count:
                speaker_count[speaker_id] = 1
            else:
                speaker_count[speaker_id] += 1
    
            labels.append(current_label)
            current_label = 2 if current_label == 1 else 1  # Alternate between 1 and 2
    
        return labels

    def process_and_append_to_csv(self, dataset, output_csv_path):
        column_names = ["audio_number", "text_GT", "diarization_without_LM", 
                                                    "diarization_35_turbo_zero_shot", "diarization_35_turbo_one_shot",
                                                    "diarization_4_zero_shot", "diarization_4_one_shot",
                        "speakers_GT", "speakers_without_LM", 
                                                    "speakers_35_turbo_zero_shot", "speakers_35_turbo_one_shot",
                                                    "speakers_4_zero_shot", "speakers_4_one_shot"]

        with open(output_csv_path, mode="a", newline="", encoding="utf-8") as csv_file:
            csv_writer = csv.writer(
                csv_file, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
            )

            if os.path.getsize(output_csv_path) == 0:
                csv_writer.writerow(column_names)

            for audio_number, row in enumerate(tqdm(dataset, desc="Processing dataset")):
                audio_data = row['audio']
                speakers_GT = row['speakers']

                # Remove consecutive repetitions from speakers_GT
                filtered_GT = [speakers_GT[0]]  # Start with the first element

                for i in range(1, len(speakers_GT)):
                    if speakers_GT[i] != filtered_GT[-1]:  # If current element is different from the last added element
                        filtered_GT.append(speakers_GT[i])
                
                                
                audio_array = audio_data['array']
                sampling_rate = audio_data['sampling_rate']
                
                # Normalize audio array to the range [-32768, 32767] (16-bit PCM range)
                audio_array_normalized = np.int16(audio_array * 32767)
                
                # Specify the output WAV file path
                output_wav_file = os.path.join(
                    os.path.dirname(output_csv_path),
                    os.path.splitext(output_csv_path)[0] + '_audio.wav'
                )

                sf.write(output_wav_file, audio_array, sampling_rate, subtype='PCM_16')


                # Making the Ground Truth of the words with Whisper hardcoding the language
                whisper_GT = self.whisper.transcribe_audio_with_whisper(
                    output_wav_file, "english"
                )

                text_GT = whisper_GT["text"]

                try:
                    # Perform Pyannote diarization
                    print("Starting Pyannote...")
                    self.pyannote.perform_diarization(output_wav_file)
                    print("Finished Pyannote")

                    rttm_file_path = "sample.rttm"
                    df = self.pyannote.rttm_to_dataframe(rttm_file_path)
                    df = df.astype({"Start Time": "float"})
                    df = df.astype({"Duration": "float"})
                    df["Utterance"] = None
                    df["End Time"] = df["Start Time"] + df["Duration"]

                    silence_gap_pairs = []

                    for ind in df.index:
                        start_time = df["Start Time"][ind]
                        end_time = df["End Time"][ind]
                        speaker = df["Speaker"][ind]

                        silence_gap_pairs.append((start_time, end_time, speaker))

                    attributes_list = []

                    current_start, current_end, current_speaker = silence_gap_pairs[0]

                    for start, end, speaker in silence_gap_pairs[1:]:
                        if speaker == current_speaker:
                            current_end = end
                        else:
                            attributes_list.append((current_start, current_end, current_speaker, ""))
                            current_start, current_end, current_speaker = start, end, speaker

                    print("Starting Whisper...")
                    for i, (start, end, speaker, text) in enumerate(attributes_list):
                        transcript = self.whisper.process_audio_segment(
                            output_wav_file, start, end, "english"  # Replace "english" with detected language
                        )
                        attributes_list[i] = (start, end, speaker, transcript)

                    print("Finished Whisper")

                    diarization_without_LM = " ".join([f"{speaker}: {text}" for _, _, speaker, text in attributes_list])

                    # call the LM to correct the diarization output
                    print("Starting 3.5 turbo...")
                    llm_gtp35_turbo = LLM(client_GPT35_turbo, model_GPT35_turbo)
                    
                    print("zero shot")
                    diarization_35_turbo_zero_shot = llm_gtp35_turbo.get_completion(text=diarization_without_LM, shot_type='zero_shot')
                    
                    print("one shot")
                    diarization_35_turbo_one_shot = llm_gtp35_turbo.get_completion(text=diarization_without_LM, shot_type='one_shot')


                    

                    print("Starting 4...")
                    llm_gpt4 = LLM(client_GPT4, model_GPT4)

                    print("zero shot")
                    diarization_4_zero_shot = llm_gpt4.get_completion(text=diarization_without_LM, shot_type='zero_shot')

                    print("one shot")
                    diarization_4_one_shot = llm_gpt4.get_completion(text=diarization_without_LM, shot_type='one_shot')



                    # compute the speakers for DiarizationPurity and DiarizationCoverage 
                    speakers_without_LM = self.extract_speakers(diarization_without_LM)
                    speakers_35_turbo_zero_shot = self.extract_speakers(diarization_35_turbo_zero_shot)
                    speakers_35_turbo_one_shot = self.extract_speakers(diarization_35_turbo_one_shot)
                    speakers_4_zero_shot = self.extract_speakers(diarization_4_zero_shot)
                    speakers_4_one_shot = self.extract_speakers(diarization_4_one_shot)
            


                    
                    csv_writer.writerow([audio_number, text_GT, 
                                                                diarization_without_LM, 
                                                                diarization_35_turbo_zero_shot, diarization_35_turbo_one_shot,
                                                                diarization_4_zero_shot, diarization_4_one_shot,
                                                        filtered_GT,
                                                                speakers_without_LM,
                                                                speakers_35_turbo_zero_shot, speakers_35_turbo_one_shot,
                                                                speakers_4_zero_shot, speakers_4_one_shot
                                        ])

                    # THIS ONLY FOR THE DEMO
                    # THIS ONLY FOR THE DEMO
                    # THIS ONLY FOR THE DEMO
                    # THIS ONLY FOR THE DEMO
                    break

                    
                    os.remove(output_wav_file)


                    

                    # break
                except Exception as e:
                    print(f"An error occurred for audio {audio_number}: {e}")

                # Flush the buffer to the file
                csv_file.flush()

    def process_dataset(self, output_csv_path):
        ds = load_dataset("talkbank/callhome", "eng")
        dataset = ds['data']

        self.process_and_append_to_csv(dataset, output_csv_path)

Main

In [None]:
torch.cuda.empty_cache()

!gpustat

diarization = PyannoteProcessor()
stt = WhisperProcessor()
audio_processor = AudioProcessor(diarization, stt)

audio_processor.process_dataset("/root/diarizare/transcripts.csv")

Metrics

Word metrics

In [None]:
class WordMetrics:
    @staticmethod
    def preprocess_transcript(transcript):
        # Define regex pattern to match speaker labels
        pattern = r'SPEAKER_\d+:'
        
        # Split transcript using regex pattern
        segments = re.split(pattern, transcript)
        
        # Clean up segments (remove empty strings and leading/trailing spaces)
        segments = [seg.strip() for seg in segments if seg.strip()]
        
        # Extract speaker labels
        speaker_labels = re.findall(pattern, transcript)
        
        return segments, speaker_labels
    
    @staticmethod
    def compute_metrics(transcript_GT, transcript_hypothesis):
        # Preprocess ground truth and hypothesis transcripts
        segments_GT, speakers_GT = WordMetrics.preprocess_transcript(transcript_GT)
        segments_hypothesis, speakers_hypothesis = WordMetrics.preprocess_transcript(transcript_hypothesis)
        
        # Calculate WDER
        def calculate_WDER(segments_GT, segments_hypothesis, speakers_GT, speakers_hypothesis):
            SIS = 0
            CIS = 0
            S = 0
            C = 0
            
            # Determine the maximum length to iterate over
            max_length = min(len(segments_GT), len(segments_hypothesis), len(speakers_GT), len(speakers_hypothesis))
            
            for i in range(max_length):
                # Skip if indices are out of range
                if i >= len(segments_GT) or i >= len(segments_hypothesis) or i >= len(speakers_GT) or i >= len(speakers_hypothesis):
                    continue
                
                speaker_h = speakers_hypothesis[i]
                speaker_g = speakers_GT[i]
                
                words_h = segments_hypothesis[i].split()
                words_g = segments_GT[i].split()
                
                # Count substitutions
                for wh, wg in zip(words_h, words_g):
                    if wh != wg:
                        S += 1
                        if speaker_h != speaker_g:
                            SIS += 1
                    else:
                        C += 1
                        if speaker_h != speaker_g:
                            CIS += 1
            
            if (S + C) > 0:
                WDER = (SIS + CIS) / (S + C)
            else:
                WDER = 0.0
            
            return WDER
        
        # Calculate cpWER
        def calculate_cpWER(segments_GT, segments_hypothesis):
            def compute_wer(ref, hyp):
                # Function to compute Word Error Rate (WER)
                ref_words = ref.split()
                hyp_words = hyp.split()
    
                # Create a matrix to store edits
                edits = [[0] * (len(hyp_words) + 1) for _ in range(len(ref_words) + 1)]
    
                # Initialize the first row and column
                for i in range(len(ref_words) + 1):
                    edits[i][0] = i
                for j in range(len(hyp_words) + 1):
                    edits[0][j] = j
    
                # Fill the matrix
                for i in range(1, len(ref_words) + 1):
                    for j in range(1, len(hyp_words) + 1):
                        if ref_words[i - 1] == hyp_words[j - 1]:
                            edits[i][j] = edits[i - 1][j - 1]
                        else:
                            substitute = edits[i - 1][j - 1] + 1
                            insert = edits[i][j - 1] + 1
                            delete = edits[i - 1][j] + 1
                            edits[i][j] = min(substitute, insert, delete)
    
                return edits[len(ref_words)][len(hyp_words)] / len(ref_words)
    
            # Concatenate all segments for reference and hypothesis
            ref_concatenated = ' '.join(segments_GT)
            hyp_concatenated = ' '.join(segments_hypothesis)
    
            # Compute WER for all permutations
            cpWER = compute_wer(ref_concatenated, hyp_concatenated)
    
            return cpWER
        
        # Compute WDER
        WDER = calculate_WDER(segments_GT, segments_hypothesis, speakers_GT, speakers_hypothesis)
        
        # Compute cpWER
        cpWER = calculate_cpWER(segments_GT, segments_hypothesis)
        
        return WDER, cpWER

Speaker metrics

In [None]:
class SpeakersMetrics:
    @staticmethod
    def convert_to_annotation(speaker_labels):
        annotation = Annotation()  # Assuming Annotation class is defined elsewhere
        current_speaker = None
        current_start = None
        
        for i, speaker_label in enumerate(speaker_labels):
            if i == 0 or speaker_label != speaker_labels[i - 1]:
                # End previous segment
                if current_speaker is not None:
                    annotation[Segment(current_start, i)] = current_speaker
                # Start new segment
                current_speaker = speaker_label
                current_start = i
        
        # Add last segment
        if current_speaker is not None:
            annotation[Segment(current_start, len(speaker_labels))] = current_speaker
        
        return annotation
    
    @staticmethod
    def compute_metrics(reference_labels, hypothesis_labels):
        # Convert speaker labels to annotations
        reference_annotation = SpeakersMetrics.convert_to_annotation(reference_labels)
        hypothesis_annotation = SpeakersMetrics.convert_to_annotation(hypothesis_labels)
        
        # Initialize purity and coverage metrics
        purity = DiarizationPurity()  # Assuming DiarizationPurity and DiarizationCoverage are defined elsewhere
        coverage = DiarizationCoverage()
        
        # Compute metrics
        purity_score = purity(reference_annotation, hypothesis_annotation)
        coverage_score = coverage(reference_annotation, hypothesis_annotation)
        
        return purity_score, coverage_score

Compute all metrics

In [None]:
import re

# Function to compute all metrics and update DataFrame
def compute_all_metrics(df):
    for suffix in ["without_LM", "35_turbo_zero_shot", "35_turbo_one_shot", "4_zero_shot", "4_one_shot"]:
        # Compute WDER and cpWER using WordMetrics
        df[f"WDER_{suffix}"], df[f"cpWER_{suffix}"] = zip(*df.apply(lambda row: WordMetrics.compute_metrics(row['text_GT'], row[f'diarization_{suffix}']), axis=1))
        
        # Compute DiarizationPurity and DiarizationCoverage using SpeakersMetrics
        df[f"DiarizationPurity_{suffix}"], df[f"DiarizationCoverage_{suffix}"] = zip(*df.apply(lambda row: SpeakersMetrics.compute_metrics(row['speakers_GT'], row[f'speakers_{suffix}']), axis=1))
    
    return df

# Load transcripts.csv into a pandas DataFrame
df = pd.read_csv('/root/diarizare/transcripts.csv')

# Compute all metrics and update DataFrame
df = compute_all_metrics(df)

# Save the updated DataFrame with metrics columns
df.to_csv('/root/diarizare/transcripts_with_metrics.csv', index=False)


Compute all results

In [None]:
import pandas as pd

df = pd.read_csv('/root/diarizare/transcripts_with_metrics.csv')

# Define the metrics and methods
metrics = ["WDER", "cpWER", "DiarizationPurity", "DiarizationCoverage"]
methods = ["without_LM", "35_turbo_zero_shot", "35_turbo_one_shot", "4_zero_shot", "4_one_shot"]

# Initialize an empty dictionary to store summary data
summary_data = {}

# Calculate the mean for each metric for each method
for metric in metrics:
    summary_data[metric] = [df[f"{metric}_{method}"].mean() for method in methods]

# Create a summary DataFrame
summary_df = pd.DataFrame(summary_data, index=methods)

# Print the summary table
print(summary_df)

# Optionally, save the summary DataFrame to a CSV file
summary_df.to_csv('summary_metrics.csv')

Extract information about cpWER

In [None]:
import pandas as pd

# Load the data from the CSV file
file_path = 'transcripts_with_metrics.csv'
df = pd.read_csv(file_path)

# Define the cpWER columns for each method
cpWER_columns = ["cpWER_35_turbo_zero_shot", "cpWER_35_turbo_one_shot", "cpWER_4_zero_shot", "cpWER_4_one_shot"]

# Find the minimum and maximum cpWER values and the corresponding columns
min_cpWER = df[cpWER_columns].min().min()
max_cpWER = df[cpWER_columns].max().max()
min_cpWER_column = df[cpWER_columns].min().idxmin()
max_cpWER_column = df[cpWER_columns].max().idxmax()

# Find the rows with the minimum and maximum cpWER values
min_cpWER_row = df[df[min_cpWER_column] == min_cpWER]
max_cpWER_row = df[df[max_cpWER_column] == max_cpWER]

# Print the relevant diarization details for the lowest cpWER
print("Details for diarization without LM and the diarization with the lowest cpWER:")
print("Diarization without LM:")
print(min_cpWER_row[['audio_number', 'diarization_without_LM', 'cpWER_without_LM']])

print(f"Diarization with the lowest cpWER ({min_cpWER_column}):")
print(min_cpWER_row[['audio_number', min_cpWER_column.replace('cpWER', 'diarization'), min_cpWER_column]])

# Print the relevant diarization details for the highest cpWER
print("\nDetails for diarization without LM and the diarization with the highest cpWER:")
print("Diarization without LM:")
print(max_cpWER_row[['audio_number', 'diarization_without_LM', 'cpWER_without_LM']])

print(f"Diarization with the highest cpWER ({max_cpWER_column}):")
print(max_cpWER_row[['audio_number', max_cpWER_column.replace('cpWER', 'diarization'), max_cpWER_column]])


Extract information about Diarization Purity

In [None]:
# Define the DiarizationPurity columns for each method
DiarizationPurity_columns = ["DiarizationPurity_35_turbo_zero_shot", "DiarizationPurity_35_turbo_one_shot", "DiarizationPurity_4_zero_shot", "DiarizationPurity_4_one_shot"]

# Find the minimum and maximum DiarizationPurity values and the corresponding columns
min_DiarizationPurity = df[DiarizationPurity_columns].min().min()
max_DiarizationPurity = df[DiarizationPurity_columns].max().max()
min_DiarizationPurity_column = df[DiarizationPurity_columns].min().idxmin()
max_DiarizationPurity_column = df[DiarizationPurity_columns].max().idxmax()

# Find the rows with the minimum and maximum DiarizationPurity values
min_DiarizationPurity_row = df[df[min_DiarizationPurity_column] == min_DiarizationPurity]
max_DiarizationPurity_row = df[df[max_DiarizationPurity_column] == max_DiarizationPurity]

# Print the relevant diarization details for the lowest DiarizationPurity
print("Details for diarization without LM and the diarization with the lowest DiarizationPurity:")
print("Diarization without LM:")
print(min_DiarizationPurity_row[['audio_number', 'diarization_without_LM', 'DiarizationPurity_without_LM']])

print(f"Diarization with the lowest DiarizationPurity ({min_DiarizationPurity_column}):")
print(min_DiarizationPurity_row[['audio_number', min_DiarizationPurity_column.replace('DiarizationPurity', 'diarization'), min_DiarizationPurity_column]])

# Print the relevant diarization details for the highest DiarizationPurity
print("\nDetails for diarization without LM and the diarization with the highest DiarizationPurity:")
print("Diarization without LM:")
print(max_DiarizationPurity_row[['audio_number', 'diarization_without_LM', 'DiarizationPurity_without_LM']])

print(f"Diarization with the highest DiarizationPurity ({max_DiarizationPurity_column}):")
print(max_DiarizationPurity_row[['audio_number', max_DiarizationPurity_column.replace('DiarizationPurity', 'diarization'), max_DiarizationPurity_column]])


Extract information about Diarization Coverage

In [None]:
import pandas as pd

# Load the data from the CSV file
file_path = 'transcripts_with_metrics.csv'
df = pd.read_csv(file_path)

# Define the DiarizationCoverage columns for each method
DiarizationCoverage_columns = ["DiarizationCoverage_35_turbo_zero_shot", "DiarizationCoverage_35_turbo_one_shot", "DiarizationCoverage_4_zero_shot", "DiarizationCoverage_4_one_shot"]

# Find the minimum and maximum DiarizationCoverage values and the corresponding columns
min_DiarizationCoverage = df[DiarizationCoverage_columns].min().min()
max_DiarizationCoverage = df[DiarizationCoverage_columns].max().max()
min_DiarizationCoverage_column = df[DiarizationCoverage_columns].min().idxmin()
max_DiarizationCoverage_column = df[DiarizationCoverage_columns].max().idxmax()

# Find the rows with the minimum and maximum DiarizationCoverage values
min_DiarizationCoverage_row = df[df[min_DiarizationCoverage_column] == min_DiarizationCoverage]
max_DiarizationCoverage_row = df[df[max_DiarizationCoverage_column] == max_DiarizationCoverage]

# Print the relevant diarization details for the lowest DiarizationCoverage
print("Details for diarization without LM and the diarization with the lowest DiarizationCoverage:")
print("Diarization without LM:")
print(min_DiarizationCoverage_row[['audio_number', 'diarization_without_LM', 'DiarizationCoverage_without_LM']])

print(f"Diarization with the lowest DiarizationCoverage ({min_DiarizationCoverage_column}):")
print(min_DiarizationCoverage_row[['audio_number', min_DiarizationCoverage_column.replace('DiarizationCoverage', 'diarization'), min_DiarizationCoverage_column]])

# Print the relevant diarization details for the highest DiarizationCoverage
print("\nDetails for diarization without LM and the diarization with the highest DiarizationCoverage:")
print("Diarization without LM:")
print(max_DiarizationCoverage_row[['audio_number', 'diarization_without_LM', 'DiarizationCoverage_without_LM']])

print(f"Diarization with the highest DiarizationCoverage ({max_DiarizationCoverage_column}):")
print(max_DiarizationCoverage_row[['audio_number', max_DiarizationCoverage_column.replace('DiarizationCoverage', 'diarization'), max_DiarizationCoverage_column]])


In [None]:
elapsed_time = (time.time() - start_time) / 60
print("%s minutes" % elapsed_time)