In [None]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.audio.pipelines.utils.hook import ProgressHook
from typing import List, Dict, Tuple, Optional
from tqdm import tqdm

# Load the Hugging Face token from config.py
try:
    from config import HF_TOKEN
except ImportError:
    HF_TOKEN = None
    logging.warning(
        "config.py not found or HF_TOKEN not defined. Diarization may not work."
        " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
    )

# Set up logging
logging.basicConfig(level=logging.INFO)


def diarize_audio(
    audio_file_path: str,
    max_speakers: Optional[int] = None,
    min_speakers: Optional[int] = None,
) -> Tuple[Pipeline, Dict]:
    """
    Performs speaker diarization on an audio file.

    Args:
        audio_file_path (str): Path to the audio file.
        max_speakers (int, optional): Maximum number of speakers. Defaults to None.
        min_speakers (int, optional): Minimum number of speakers. Defaults to None.

    Returns:
        tuple: A tuple containing the diarization pipeline and the diarization output.
    """
    # Check if a GPU is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device: {device}")

    # Load the pipeline
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1",
        use_auth_token=HF_TOKEN,
    )
    pipeline.to(device)  # Move the pipeline to the selected device

    # Load the audio file
    try:
        waveform, sample_rate = torchaudio.load(audio_file_path)
    except Exception as e:
        logging.error(f"Error loading audio file: {e}")
        raise  # Re-raise the exception to be handled by the caller

    # Prepare the input for the pipeline
    input_data = {"waveform": waveform, "sample_rate": sample_rate}

    # Add speaker number hints if provided
    if max_speakers is not None:
        input_data["max_speakers"] = max_speakers
    if min_speakers is not None:
        input_data["min_speakers"] = min_speakers

    # Run the diarization pipeline with the progress hook
    with ProgressHook() as hook:
        try:
            diarization = pipeline(input_data, hook=hook)
        except Exception as e:
            logging.error(f"Error during diarization: {e}")
            raise  # Re-raise the exception

    logging.info("Diarization complete.")
    return pipeline, diarization  # Return both pipeline and diarization



In [8]:
# Specify the path to your audio file
audio_file_path = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"

# Optional: Specify the maximum and minimum number of speakers
max_speakers = 2
min_speakers = 1

# Call the diarize_audio function
pipeline, diarization = diarize_audio(audio_file_path, max_speakers, min_speakers)

# Print the diarization result
for turn, _, speaker in diarization.itertracks(yield_label=True):
    print(f"Speaker {speaker}: {turn.start:.2f} - {turn.end:.2f}")

INFO:root:Using device: cpu


INFO:root:Diarization complete.


Speaker SPEAKER_00: 0.03 - 10.00
Speaker SPEAKER_01: 11.42 - 12.82
Speaker SPEAKER_00: 12.82 - 14.91
Speaker SPEAKER_00: 16.94 - 25.63
Speaker SPEAKER_01: 25.58 - 27.37
Speaker SPEAKER_00: 27.37 - 28.97


# chunks

In [19]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from typing import List, Dict, Tuple, Optional
import os
import json
from typing import Optional, Dict, Tuple
import torch
from pyannote.audio import Pipeline
from pyannote.core import Segment

try:
    from pydub import AudioSegment
except ImportError:
    print("pydub is not installed. Please install it using: pip install pydub")
    AudioSegment = None

try:
    import whisper_timestamped as whisper
except ImportError:
    print(
        "whisper-timestamped is not installed. Please install it using: pip install whisper-timestamped"
    )
    whisper = None


def diarize_audio(
    audio_file_path: str,
    max_speakers: Optional[int] = None,
    min_speakers: Optional[int] = None,
) -> Tuple[Pipeline, Dict]:
    """
    Performs speaker diarization on an audio file.

    Args:
        audio_file_path (str): Path to the audio file.
        max_speakers (int, optional): Maximum number of speakers. Defaults to None.
        min_speakers (int, optional): Minimum number of speakers. Defaults to None.

    Returns:
        tuple: A tuple containing the diarization pipeline and the diarization output.
    """
    import logging

    # Check if a GPU is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device: {device}")

    # Load the pipeline
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1",
        use_auth_token="YOUR_HUGGINGFACE_TOKEN",  # Replace with your token
    )
    pipeline.to(device)  # Move the pipeline to the selected device

    # Load the audio file
    try:
        waveform, sample_rate = torchaudio.load(audio_file_path)
    except Exception as e:
        logging.error(f"Error loading audio file: {e}")
        raise

    # Prepare the input for the pipeline
    input_data = {"waveform": waveform, "sample_rate": sample_rate}

    # Add speaker number hints if provided
    if max_speakers is not None:
        input_data["max_speakers"] = max_speakers
    if min_speakers is not None:
        input_data["min_speakers"] = min_speakers

    # Run the diarization pipeline
    try:
        diarization = pipeline(input_data)
    except Exception as e:
        logging.error(f"Error during diarization: {e}")
        raise

    logging.info("Diarization complete.")
    return pipeline, diarization


def chunk_audio(
    audio_file_path: str, diarization: Dict, output_directory: str
) -> List[Dict]:
    """
    Chunks an audio file into segments based on the diarization output, saves them,
    and returns a list of chunk information.

    Args:
        audio_file_path (str): Path to the audio file.
        diarization (Dict): Diarization output from the pyannote pipeline.
        output_directory (str): Directory to save the chunked audio files.

    Returns:
        List[Dict]: A list where each element is a dictionary containing:
            - file_path (str): Path to the saved audio chunk.
            - speaker (str): Speaker label.
            - start_time (float): Start time of the chunk in seconds.
            - end_time (float): End time of the chunk in seconds.
    """
    # Create the output directory if it doesn't exist
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # Load the audio file using pydub
    if AudioSegment is None:
        print("Skipping audio chunking because pydub is not installed.")
        return []

    audio = AudioSegment.from_file(audio_file_path)

    chunk_info_list = []
    chunk_number = 0  # Initialize chunk counter

    # Iterate over each turn in the diarization output
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        start_time = turn.start
        end_time = turn.end
        start_time_ms = int(start_time * 1000)  # Convert seconds to milliseconds
        end_time_ms = int(end_time * 1000)  # Convert seconds to milliseconds

        chunk = audio[start_time_ms:end_time_ms]
        chunk_number += 1  # Increment chunk counter for filename
        output_file_path = os.path.join(
            output_directory,
            f"chunk_{chunk_number}.mp3",  # Use chunk number in filename
        )
        chunk.export(output_file_path, format="mp3")

        chunk_info = {
            "file_path": output_file_path,
            "speaker": speaker,
            "start_time": start_time,
            "end_time": end_time,
        }
        chunk_info_list.append(chunk_info)

    return chunk_info_list


def transcribe_audio_chunk(audio_file_path: str) -> Dict:
    """
    Transcribes an audio file using whisper-timestamped.

    Args:
        audio_file_path (str): Path to the audio file.

    Returns:
        Dict: The transcription with word-level timestamps.  Returns empty dict on error.
    """
    if whisper is None:
        print("whisper-timestamped is not installed. Transcription is skipped.")
        return {}

    try:
        # Load audio and model
        model = whisper.load_model("base")  # You can change the model size if needed
        audio = whisper.load_audio(audio_file_path)

        # Transcribe the audio file
        result = whisper.transcribe(
            model, audio, language="en"
        )  # You can change the language
        return result
    except Exception as e:
        logging.error(f"Error during transcription: {e}")
        print(f"Error during transcription: {e}")
        return {}  # Return empty dict on error


def process_and_transcribe_chunks(chunk_info_list: List[Dict], output_dir: str) -> None:
    """
    Transcribes audio chunks using whisper-timestamped and saves the transcriptions
    to a JSON file.

    Args:
        chunk_info_list (List[Dict]): A list of dictionaries, where each dictionary
            contains chunk information as returned by the chunk_audio function.
        output_dir (str): The directory where the JSON file should be saved.
    """
    if not chunk_info_list:
        print("No chunks to transcribe.")
        return

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    transcriptions = []
    for chunk_info in chunk_info_list:
        chunk_file_path = chunk_info["file_path"]
        speaker = chunk_info["speaker"]
        start_time = chunk_info["start_time"]
        end_time = chunk_info["end_time"]

        print(
            f"Transcribing chunk: {chunk_file_path}, Speaker: {speaker}, Start: {start_time:.2f}, End: {end_time:.2f}"
        )
        transcription = transcribe_audio_chunk(chunk_file_path)  # returns {} on error

        if transcription:  # only add if transcription was successful
            transcription_data = {
                "speaker": speaker,
                "start_time": start_time,
                "end_time": end_time,
                "transcription": transcription,
            }
            transcriptions.append(transcription_data)

            # Print the transcription
            print(f"Transcription for {chunk_file_path}:")
            for segment in transcription["segments"]:
                print(
                    f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}"
                )

    # Save all transcriptions to a JSON file
    output_json_path = os.path.join(output_dir, "transcriptions.json")
    try:
        with open(output_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        print(f"Transcriptions saved to {output_json_path}")
    except Exception as e:
        logging.error(f"Error saving transcriptions to JSON: {e}")
        print(f"Error saving transcriptions to JSON: {e}")

Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.



In [20]:
# Example usage:
audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
output_dir = "chunks"
# Make sure the output directory exists
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Set up basic logging
logging.basicConfig(level=logging.INFO)

try:
    pipeline, diarization_result = diarize_audio(audio_file)
    chunk_info_list = chunk_audio(audio_file, diarization_result, output_dir)
    print("Audio file diarized and chunked.")

    process_and_transcribe_chunks(
        chunk_info_list, output_dir
    )  # added call to new function

except Exception as e:
    logging.error(f"An error occurred: {e}")
    print(f"An error occurred: {e}")

INFO:root:Using device: cpu
  std = sequences.std(dim=-1, correction=1)
INFO:root:Diarization complete.


Audio file diarized and chunked.
Transcribing chunk: chunks\chunk_1.mp3, Speaker: SPEAKER_00, Start: 0.03, End: 10.00


100%|██████████| 997/997 [00:04<00:00, 225.82frames/s]


Transcription for chunks\chunk_1.mp3:
    [0.00 - 1.58]  Anyway, look where we're digressing the rules.
    [1.68 - 4.32]  Oh, simple, Emma, you're about to face five questions
    [4.32 - 5.30]  of increasing difficulty.
    [5.32 - 6.60]  You must answer as quickly as possible.
    [6.62 - 8.32]  If you get it correct, you move onto the next round.
    [8.58 - 9.76]  Do you know what happens if you get it wrong?
Transcribing chunk: chunks\chunk_2.mp3, Speaker: SPEAKER_01, Start: 11.42, End: 12.82


100%|██████████| 140/140 [00:01<00:00, 121.47frames/s]


Transcription for chunks\chunk_2.mp3:
    [0.10 - 1.14]  and correction and embarrassment.
Transcribing chunk: chunks\chunk_3.mp3, Speaker: SPEAKER_00, Start: 12.82, End: 14.91


100%|██████████| 209/209 [00:01<00:00, 140.74frames/s]


Transcription for chunks\chunk_3.mp3:
    [0.00 - 1.86]  Do indeed round one.
Transcribing chunk: chunks\chunk_4.mp3, Speaker: SPEAKER_00, Start: 16.94, End: 25.63


100%|██████████| 869/869 [00:02<00:00, 366.74frames/s]


Transcription for chunks\chunk_4.mp3:
    [0.08 - 4.48]  Round 1 astronomers are saying that Saturn's rings are slowly disappearing.
    [4.76 - 8.70]  They estimate we only have a few hundred million years left of them.
Transcribing chunk: chunks\chunk_5.mp3, Speaker: SPEAKER_01, Start: 25.58, End: 27.37


100%|██████████| 178/178 [00:01<00:00, 115.73frames/s]


Transcription for chunks\chunk_5.mp3:
    [0.06 - 1.52]  I'll earn you a few hundred million.
Transcribing chunk: chunks\chunk_6.mp3, Speaker: SPEAKER_00, Start: 27.37, End: 28.97


100%|██████████| 143/143 [00:01<00:00, 98.25frames/s]

Transcription for chunks\chunk_6.mp3:
    [0.16 - 1.08]  But what I want to know?
Transcriptions saved to chunks\transcriptions.json





# merge

In [23]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from typing import List, Dict, Tuple, Optional
import os
import json
from typing import Optional, Dict, Tuple
import torch
from pyannote.audio import Pipeline
from pyannote.core import Segment

try:
    from pydub import AudioSegment
except ImportError:
    print("pydub is not installed. Please install it using: pip install pydub")
    AudioSegment = None

try:
    import whisper_timestamped as whisper
except ImportError:
    print(
        "whisper-timestamped is not installed. Please install it using: pip install whisper-timestamped"
    )
    whisper = None


def diarize_audio(
    audio_file_path: str,
    max_speakers: Optional[int] = None,
    min_speakers: Optional[int] = None,
) -> Tuple[Pipeline, Dict]:
    """
    Performs speaker diarization on an audio file.

    Args:
        audio_file_path (str): Path to the audio file.
        max_speakers (int, optional): Maximum number of speakers. Defaults to None.
        min_speakers (int, optional): Minimum number of speakers. Defaults to None.

    Returns:
        tuple: A tuple containing the diarization pipeline and the diarization output.
    """
    import logging

    # Check if a GPU is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device: {device}")

    # Load the pipeline
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1",
        use_auth_token="YOUR_HUGGINGFACE_TOKEN",  # Replace with your token
    )
    pipeline.to(device)  # Move the pipeline to the selected device

    # Load the audio file
    try:
        waveform, sample_rate = torchaudio.load(audio_file_path)
    except Exception as e:
        logging.error(f"Error loading audio file: {e}")
        raise

    # Prepare the input for the pipeline
    input_data = {"waveform": waveform, "sample_rate": sample_rate}

    # Add speaker number hints if provided
    if max_speakers is not None:
        input_data["max_speakers"] = max_speakers
    if min_speakers is not None:
        input_data["min_speakers"] = min_speakers

    # Run the diarization pipeline
    try:
        diarization = pipeline(input_data)
    except Exception as e:
        logging.error(f"Error during diarization: {e}")
        raise

    logging.info("Diarization complete.")
    return pipeline, diarization


def chunk_audio(
    audio_file_path: str, diarization: Dict, output_directory: str
) -> List[Dict]:
    """
    Chunks an audio file into segments based on the diarization output, saves them,
    and returns a list of chunk information.

    Args:
        audio_file_path (str): Path to the audio file.
        diarization (Dict): Diarization output from the pyannote pipeline.
        output_directory (str): Directory to save the chunked audio files.

    Returns:
        List[Dict]: A list where each element is a dictionary containing:
            - file_path (str): Path to the saved audio chunk.
            - speaker (str): Speaker label.
            - start_time (float): Start time of the chunk in seconds.
            - end_time (float): End time of the chunk in seconds.
    """
    # Create the output directory if it doesn't exist
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # Load the audio file using pydub
    if AudioSegment is None:
        print("Skipping audio chunking because pydub is not installed.")
        return []

    audio = AudioSegment.from_file(audio_file_path)

    chunk_info_list = []
    chunk_number = 0  # Initialize chunk counter

    # Iterate over each turn in the diarization output
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        start_time = turn.start
        end_time = turn.end
        start_time_ms = int(start_time * 1000)  # Convert seconds to milliseconds
        end_time_ms = int(end_time * 1000)  # Convert seconds to milliseconds

        chunk = audio[start_time_ms:end_time_ms]
        chunk_number += 1  # Increment chunk counter for filename
        output_file_path = os.path.join(
            output_directory,
            f"chunk_{chunk_number}.mp3",  # Use chunk number in filename
        )
        chunk.export(output_file_path, format="mp3")

        chunk_info = {
            "file_path": output_file_path,
            "speaker": speaker,
            "start_time": start_time,
            "end_time": end_time,
        }
        chunk_info_list.append(chunk_info)

    return chunk_info_list


def transcribe_audio_chunk(audio_file_path: str) -> Dict:
    """
    Transcribes an audio file using whisper-timestamped.

    Args:
        audio_file_path (str): Path to the audio file.

    Returns:
        Dict: The transcription with word-level timestamps.  Returns empty dict on error.
    """
    if whisper is None:
        print("whisper-timestamped is not installed. Transcription is skipped.")
        return {}

    try:
        # Load audio and model
        model = whisper.load_model("base")  # You can change the model size if needed
        audio = whisper.load_audio(audio_file_path)

        # Transcribe the audio file
        result = whisper.transcribe(
            model, audio, language="en"
        )  # You can change the language
        return result
    except Exception as e:
        logging.error(f"Error during transcription: {e}")
        print(f"Error during transcription: {e}")
        return {}  # Return empty dict on error


def process_and_transcribe_chunks(chunk_info_list: List[Dict], output_dir: str) -> None:
    """
    Transcribes audio chunks using whisper-timestamped and saves the transcriptions
    to a JSON file.

    Args:
        chunk_info_list (List[Dict]): A list of dictionaries, where each dictionary
            contains chunk information as returned by the chunk_audio function.
        output_dir (str): The directory where the JSON file should be saved.
    """
    if not chunk_info_list:
        print("No chunks to transcribe.")
        return

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    transcriptions = []
    for chunk_info in chunk_info_list:
        chunk_file_path = chunk_info["file_path"]
        speaker = chunk_info["speaker"]
        start_time = chunk_info["start_time"]
        end_time = chunk_info["end_time"]

        print(
            f"Transcribing chunk: {chunk_file_path}, Speaker: {speaker}, Start: {start_time:.2f}, End: {end_time:.2f}"
        )
        transcription = transcribe_audio_chunk(chunk_file_path)  # returns {} on error

        if transcription:  # only add if transcription was successful
            transcription_data = {
                "speaker": speaker,
                "start_time": start_time,
                "end_time": end_time,
                "transcription": transcription,
            }
            transcriptions.append(transcription_data)

            # Print the transcription
            print(f"Transcription for {chunk_file_path}:")
            for segment in transcription["segments"]:
                print(
                    f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}"
                )

    # Save all transcriptions to a JSON file
    output_json_path = os.path.join(output_dir, "transcriptions.json")
    try:
        with open(output_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        print(f"Transcriptions saved to {output_json_path}")
    except Exception as e:
        logging.error(f"Error saving transcriptions to JSON: {e}")
        print(f"Error saving transcriptions to JSON: {e}")


def clean_transcription(transcriptions_json_path: str) -> List[Dict]:
    """
    Cleans the transcription data from a JSON file, merging text from the same speaker
    into paragraphs.  The output format is:  "SpeakerName [start_time - end_time]: text"

    Args:
        transcriptions_json_path (str): Path to the JSON file containing the transcription data
            generated by process_and_transcribe_chunks.

    Returns:
        List[str]: A list of strings, where each string represents a paragraph
        in the format "SpeakerName [start_time - end_time]: text".
    """
    try:
        with open(transcriptions_json_path, "r") as f:
            transcriptions = json.load(f)
    except Exception as e:
        logging.error(f"Error loading transcriptions JSON: {e}")
        print(f"Error loading transcriptions JSON: {e}")
        return []

    cleaned_transcriptions = []
    current_speaker = None
    current_text = ""
    current_start_time = None
    current_end_time = None

    for chunk in transcriptions:
        speaker = chunk["speaker"]
        transcription = chunk["transcription"]
        chunk_start_time = chunk["start_time"]  # Use chunk start/end times
        chunk_end_time = chunk["end_time"]

        if not transcription or not transcription["segments"]:
            continue  # Skip empty transcriptions

        for segment in transcription["segments"]:
            text = segment["text"]

            if current_speaker != speaker:
                if current_speaker is not None:
                    cleaned_transcriptions.append(
                        f"{current_speaker} [{current_start_time:.2f} - {current_end_time:.2f}]: {current_text}"
                    )
                current_speaker = speaker
                current_text = text
                current_start_time = chunk_start_time  # from chunk
                current_end_time = chunk_end_time  # from chunk
            else:
                current_text += " " + text
                current_end_time = chunk_end_time  # from chunk

    # Add the last paragraph
    if current_speaker is not None:
        cleaned_transcriptions.append(
            f"{current_speaker} [{current_start_time:.2f} - {current_end_time:.2f}]: {current_text}"
        )

    return cleaned_transcriptions

In [25]:
# Example usage:
audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
output_dir = "chunks"
# Make sure the output directory exists
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Set up basic logging
logging.basicConfig(level=logging.INFO)

try:
    pipeline, diarization_result = diarize_audio(audio_file)
    chunk_info_list = chunk_audio(audio_file, diarization_result, output_dir)
    print("Audio file diarized and chunked.")

    process_and_transcribe_chunks(chunk_info_list, output_dir) # added call to new function

    transcriptions_json = os.path.join(output_dir, "transcriptions.json")
    cleaned_transcriptions = clean_transcription(transcriptions_json)

    print("\nCleaned Transcriptions:")
    for paragraph in cleaned_transcriptions:
        print(f"Speaker: {paragraph['speaker']}")
        print(f"Time: {paragraph['start_time']:.2f} - {paragraph['end_time']:.2f}")
        print(f"Text: {paragraph['text']}\n")

except Exception as e:
    logging.error(f"An error occurred: {e}")
    print(f"An error occurred: {e}")

INFO:root:Using device: cpu
  std = sequences.std(dim=-1, correction=1)
INFO:root:Diarization complete.


Audio file diarized and chunked.
Transcribing chunk: chunks\chunk_1.mp3, Speaker: SPEAKER_00, Start: 0.03, End: 10.00


100%|██████████| 997/997 [00:03<00:00, 272.55frames/s]


Transcription for chunks\chunk_1.mp3:
    [0.00 - 1.58]  Anyway, look where we're digressing the rules.
    [1.68 - 4.32]  Oh, simple, Emma, you're about to face five questions
    [4.32 - 5.30]  of increasing difficulty.
    [5.32 - 6.60]  You must answer as quickly as possible.
    [6.62 - 8.32]  If you get it correct, you move onto the next round.
    [8.58 - 9.76]  Do you know what happens if you get it wrong?
Transcribing chunk: chunks\chunk_2.mp3, Speaker: SPEAKER_01, Start: 11.42, End: 12.82


100%|██████████| 140/140 [00:01<00:00, 79.05frames/s]


Transcription for chunks\chunk_2.mp3:
    [0.10 - 1.14]  and correction and embarrassment.
Transcribing chunk: chunks\chunk_3.mp3, Speaker: SPEAKER_00, Start: 12.82, End: 14.91


100%|██████████| 209/209 [00:01<00:00, 155.39frames/s]


Transcription for chunks\chunk_3.mp3:
    [0.00 - 1.86]  Do indeed round one.
Transcribing chunk: chunks\chunk_4.mp3, Speaker: SPEAKER_00, Start: 16.94, End: 25.63


100%|██████████| 869/869 [00:01<00:00, 466.88frames/s]


Transcription for chunks\chunk_4.mp3:
    [0.08 - 4.48]  Round 1 astronomers are saying that Saturn's rings are slowly disappearing.
    [4.76 - 8.70]  They estimate we only have a few hundred million years left of them.
Transcribing chunk: chunks\chunk_5.mp3, Speaker: SPEAKER_01, Start: 25.58, End: 27.37


100%|██████████| 178/178 [00:01<00:00, 124.91frames/s]


Transcription for chunks\chunk_5.mp3:
    [0.06 - 1.52]  I'll earn you a few hundred million.
Transcribing chunk: chunks\chunk_6.mp3, Speaker: SPEAKER_00, Start: 27.37, End: 28.97


100%|██████████| 143/143 [00:01<00:00, 132.10frames/s]
ERROR:root:An error occurred: string indices must be integers, not 'str'


Transcription for chunks\chunk_6.mp3:
    [0.16 - 1.08]  But what I want to know?
Transcriptions saved to chunks\transcriptions.json

Cleaned Transcriptions:
An error occurred: string indices must be integers, not 'str'


In [26]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from typing import List, Dict, Tuple, Optional
import os
import json
from typing import Optional, Dict, Tuple
import torch
from pyannote.audio import Pipeline
from pyannote.core import Segment

try:
    from pydub import AudioSegment
except ImportError:
    print("pydub is not installed. Please install it using: pip install pydub")
    AudioSegment = None

try:
    import whisper_timestamped as whisper
except ImportError:
    print(
        "whisper-timestamped is not installed. Please install it using: pip install whisper-timestamped"
    )
    whisper = None


def diarize_audio(
    audio_file_path: str,
    max_speakers: Optional[int] = None,
    min_speakers: Optional[int] = None,
) -> Tuple[Pipeline, Dict]:
    """
    Performs speaker diarization on an audio file.

    Args:
        audio_file_path (str): Path to the audio file.
        max_speakers (int, optional): Maximum number of speakers. Defaults to None.
        min_speakers (int, optional): Minimum number of speakers. Defaults to None.

    Returns:
        tuple: A tuple containing the diarization pipeline and the diarization output.
    """
    import logging

    # Check if a GPU is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device: {device}")

    # Load the pipeline
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1",
        use_auth_token="YOUR_HUGGINGFACE_TOKEN",  # Replace with your token
    )
    pipeline.to(device)  # Move the pipeline to the selected device

    # Load the audio file
    try:
        waveform, sample_rate = torchaudio.load(audio_file_path)
    except Exception as e:
        logging.error(f"Error loading audio file: {e}")
        raise

    # Prepare the input for the pipeline
    input_data = {"waveform": waveform, "sample_rate": sample_rate}

    # Add speaker number hints if provided
    if max_speakers is not None:
        input_data["max_speakers"] = max_speakers
    if min_speakers is not None:
        input_data["min_speakers"] = min_speakers

    # Run the diarization pipeline
    try:
        diarization = pipeline(input_data)
    except Exception as e:
        logging.error(f"Error during diarization: {e}")
        raise

    logging.info("Diarization complete.")
    return pipeline, diarization


def chunk_audio(
    audio_file_path: str, diarization: Dict, output_directory: str
) -> List[Dict]:
    """
    Chunks an audio file into segments based on the diarization output, saves them,
    and returns a list of chunk information.

    Args:
        audio_file_path (str): Path to the audio file.
        diarization (Dict): Diarization output from the pyannote pipeline.
        output_directory (str): Directory to save the chunked audio files.

    Returns:
        List[Dict]: A list where each element is a dictionary containing:
            - file_path (str): Path to the saved audio chunk.
            - speaker (str): Speaker label.
            - start_time (float): Start time of the chunk in seconds.
            - end_time (float): End time of the chunk in seconds.
    """
    # Create the output directory if it doesn't exist
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # Load the audio file using pydub
    if AudioSegment is None:
        print("Skipping audio chunking because pydub is not installed.")
        return []

    audio = AudioSegment.from_file(audio_file_path)

    chunk_info_list = []
    chunk_number = 0  # Initialize chunk counter

    # Iterate over each turn in the diarization output
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        start_time = turn.start
        end_time = turn.end
        start_time_ms = int(start_time * 1000)  # Convert seconds to milliseconds
        end_time_ms = int(end_time * 1000)  # Convert seconds to milliseconds

        chunk = audio[start_time_ms:end_time_ms]
        chunk_number += 1  # Increment chunk counter for filename
        output_file_path = os.path.join(
            output_directory,
            f"chunk_{chunk_number}.mp3",  # Use chunk number in filename
        )
        chunk.export(output_file_path, format="mp3")

        chunk_info = {
            "file_path": output_file_path,
            "speaker": speaker,
            "start_time": start_time,
            "end_time": end_time,
        }
        chunk_info_list.append(chunk_info)

    return chunk_info_list


def transcribe_audio_chunk(audio_file_path: str) -> Dict:
    """
    Transcribes an audio file using whisper-timestamped.

    Args:
        audio_file_path (str): Path to the audio file.

    Returns:
        Dict: The transcription with word-level timestamps.  Returns empty dict on error.
    """
    if whisper is None:
        print("whisper-timestamped is not installed. Transcription is skipped.")
        return {}

    try:
        # Load audio and model
        model = whisper.load_model("base")  # You can change the model size if needed
        audio = whisper.load_audio(audio_file_path)

        # Transcribe the audio file
        result = whisper.transcribe(
            model, audio, language="en"
        )  # You can change the language
        return result
    except Exception as e:
        logging.error(f"Error during transcription: {e}")
        print(f"Error during transcription: {e}")
        return {}  # Return empty dict on error


def process_and_transcribe_chunks(chunk_info_list: List[Dict], output_dir: str) -> None:
    """
    Transcribes audio chunks using whisper-timestamped and saves the transcriptions
    to a JSON file.

    Args:
        chunk_info_list (List[Dict]): A list of dictionaries, where each dictionary
            contains chunk information as returned by the chunk_audio function.
        output_dir (str): The directory where the JSON file should be saved.
    """
    if not chunk_info_list:
        print("No chunks to transcribe.")
        return

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    transcriptions = []
    for chunk_info in chunk_info_list:
        chunk_file_path = chunk_info["file_path"]
        speaker = chunk_info["speaker"]
        start_time = chunk_info["start_time"]
        end_time = chunk_info["end_time"]

        print(
            f"Transcribing chunk: {chunk_file_path}, Speaker: {speaker}, Start: {start_time:.2f}, End: {end_time:.2f}"
        )
        transcription = transcribe_audio_chunk(chunk_file_path)  # returns {} on error

        if transcription:  # only add if transcription was successful
            transcription_data = {
                "speaker": speaker,
                "start_time": start_time,
                "end_time": end_time,
                "transcription": transcription,
            }
            transcriptions.append(transcription_data)

            # Print the transcription
            print(f"Transcription for {chunk_file_path}:")
            for segment in transcription["segments"]:
                print(
                    f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}"
                )

    # Save all transcriptions to a JSON file
    output_json_path = os.path.join(output_dir, "transcriptions.json")
    try:
        with open(output_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        print(f"Transcriptions saved to {output_json_path}")
    except Exception as e:
        logging.error(f"Error saving transcriptions to JSON: {e}")
        print(f"Error saving transcriptions to JSON: {e}")


def clean_transcription(transcriptions_json_path: str) -> List[str]:
    """
    Cleans the transcription data from a JSON file, merging text from the same speaker
    into paragraphs.  The output format is:  "SpeakerName [start_time - end_time]: text"

    Args:
        transcriptions_json_path (str): Path to the JSON file containing the transcription data
            generated by process_and_transcribe_chunks.

    Returns:
        List[str]: A list of strings, where each string represents a paragraph
        in the format "SpeakerName [start_time - end_time]: text".
    """
    try:
        with open(transcriptions_json_path, "r") as f:
            transcriptions = json.load(f)
    except Exception as e:
        logging.error(f"Error loading transcriptions JSON: {e}")
        print(f"Error loading transcriptions JSON: {e}")
        return []

    cleaned_transcriptions = []
    current_speaker = None
    current_text = ""
    current_start_time = None
    current_end_time = None

    for chunk in transcriptions:
        speaker = chunk["speaker"]
        # transcription = chunk["transcription"] # Removed unused variable
        chunk_start_time = chunk["start_time"]  # Use chunk start/end times
        chunk_end_time = chunk["end_time"]

        # if not transcription or not transcription["segments"]: # Removed check for transcription
        #     continue  # Skip empty transcriptions
        if current_speaker != speaker:
            if current_speaker is not None:
                cleaned_transcriptions.append(
                    f"{current_speaker} [{current_start_time:.2f} - {current_end_time:.2f}]: {current_text}"
                )
            current_speaker = speaker
            current_text = ""  # Reset text for new speaker.
            current_start_time = chunk_start_time  # from chunk
            current_end_time = chunk_end_time  # from chunk
        else:
            #  The original error occurred because the code was trying to access
            #  the 'text' key within the 'chunk' dictionary.  The 'chunk' dictionary
            #  does NOT contain a 'text' key.  The 'text' comes from the 'segment'
            #  in the inner loop, which is not used here.
            #  Instead,  we should accumulate the text from the segments within
            #  the same speaker.  But, we don't have the segments here.
            #  The simplest fix is to just pass the entire chunk.
            #  We will reconstruct the text.
            # current_text += " " + chunk  # This line caused the error
            # The corrected way is to accumulate the text.
            # There is no 'text' in chunk, the text is in the transcription segments.
            # But we don't have the segments here, so we will reconstruct the text later.
            current_end_time = chunk_end_time  # from chunk

        # Add the text from the chunk.
        transcription = chunk.get(
            "transcription", None
        )  # Get transcription, None if missing.
        if transcription and transcription["segments"]:
            for segment in transcription["segments"]:
                current_text += " " + segment["text"]

    # Add the last paragraph
    if current_speaker is not None:
        cleaned_transcriptions.append(
            f"{current_speaker} [{current_start_time:.2f} - {current_end_time:.2f}]: {current_text}"
        )

    return cleaned_transcriptions

In [27]:
# Example usage:
audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
output_dir = "chunks"
# Make sure the output directory exists
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Set up basic logging
logging.basicConfig(level=logging.INFO)

try:
    pipeline, diarization_result = diarize_audio(audio_file)
    chunk_info_list = chunk_audio(audio_file, diarization_result, output_dir)
    print("Audio file diarized and chunked.")

    process_and_transcribe_chunks(
        chunk_info_list, output_dir
    )  # added call to new function

    transcriptions_json = os.path.join(output_dir, "transcriptions.json")
    cleaned_transcriptions = clean_transcription(transcriptions_json)

    print("\nCleaned Transcriptions:")
    for paragraph in cleaned_transcriptions:
        print(f"Speaker: {paragraph['speaker']}")
        print(f"Time: {paragraph['start_time']:.2f} - {paragraph['end_time']:.2f}")
        print(f"Text: {paragraph['text']}\n")

except Exception as e:
    logging.error(f"An error occurred: {e}")
    print(f"An error occurred: {e}")

INFO:root:Using device: cpu
  std = sequences.std(dim=-1, correction=1)
INFO:root:Diarization complete.


Audio file diarized and chunked.
Transcribing chunk: chunks\chunk_1.mp3, Speaker: SPEAKER_00, Start: 0.03, End: 10.00


100%|██████████| 997/997 [00:03<00:00, 273.04frames/s]


Transcription for chunks\chunk_1.mp3:
    [0.00 - 1.58]  Anyway, look where we're digressing the rules.
    [1.68 - 4.32]  Oh, simple, Emma, you're about to face five questions
    [4.32 - 5.30]  of increasing difficulty.
    [5.32 - 6.60]  You must answer as quickly as possible.
    [6.62 - 8.32]  If you get it correct, you move onto the next round.
    [8.58 - 9.76]  Do you know what happens if you get it wrong?
Transcribing chunk: chunks\chunk_2.mp3, Speaker: SPEAKER_01, Start: 11.42, End: 12.82


100%|██████████| 140/140 [00:00<00:00, 140.70frames/s]


Transcription for chunks\chunk_2.mp3:
    [0.10 - 1.14]  and correction and embarrassment.
Transcribing chunk: chunks\chunk_3.mp3, Speaker: SPEAKER_00, Start: 12.82, End: 14.91


100%|██████████| 209/209 [00:01<00:00, 173.37frames/s]


Transcription for chunks\chunk_3.mp3:
    [0.00 - 1.86]  Do indeed round one.
Transcribing chunk: chunks\chunk_4.mp3, Speaker: SPEAKER_00, Start: 16.94, End: 25.63


100%|██████████| 869/869 [00:01<00:00, 439.77frames/s]


Transcription for chunks\chunk_4.mp3:
    [0.08 - 4.48]  Round 1 astronomers are saying that Saturn's rings are slowly disappearing.
    [4.76 - 8.70]  They estimate we only have a few hundred million years left of them.
Transcribing chunk: chunks\chunk_5.mp3, Speaker: SPEAKER_01, Start: 25.58, End: 27.37


100%|██████████| 178/178 [00:01<00:00, 155.12frames/s]


Transcription for chunks\chunk_5.mp3:
    [0.06 - 1.52]  I'll earn you a few hundred million.
Transcribing chunk: chunks\chunk_6.mp3, Speaker: SPEAKER_00, Start: 27.37, End: 28.97


100%|██████████| 143/143 [00:01<00:00, 136.58frames/s]
ERROR:root:An error occurred: string indices must be integers, not 'str'


Transcription for chunks\chunk_6.mp3:
    [0.16 - 1.08]  But what I want to know?
Transcriptions saved to chunks\transcriptions.json

Cleaned Transcriptions:
An error occurred: string indices must be integers, not 'str'


In [31]:
def clean_transcription(transcriptions_json_path: str) -> List[str]:
    """
    Cleans the transcription data from a JSON file, merging text from the same speaker
    into paragraphs.  The output format is:  "SpeakerName [start_time - end_time]: text"

    Args:
        transcriptions_json_path (str): Path to the JSON file containing the transcription data
            generated by process_and_transcribe_chunks.

    Returns:
        List[str]: A list of strings, where each string represents a paragraph
        in the format "SpeakerName [start_time - end_time]: text".
    """
    try:
        with open(transcriptions_json_path, "r") as f:
            transcriptions = json.load(f)
    except Exception as e:
        logging.error(f"Error loading transcriptions JSON: {e}")
        print(f"Error loading transcriptions JSON: {e}")
        return []

    cleaned_transcriptions = []
    current_speaker = None
    current_text = ""
    current_start_time = None
    current_end_time = None

    for chunk in transcriptions:
        speaker = chunk["speaker"]
        # transcription = chunk["transcription"] # Removed unused variable
        chunk_start_time = chunk["start_time"]  # Use chunk start/end times
        chunk_end_time = chunk["end_time"]

        # if not transcription or not ["segments"]: # Removed check for transcription
        #     continue  # Skip empty transcriptions
        if current_speaker != speaker:
            if current_speaker is not None:
                cleaned_transcriptions.append(
                    f"\n{current_speaker} [{current_start_time:.2f} - {current_end_time:.2f}]: {current_text}"  # Add newline
                )
            current_speaker = speaker
            current_text = ""  # Reset text for new speaker.
            current_start_time = chunk_start_time  # from chunk
            current_end_time = chunk_end_time  # from chunk
        else:
            #  The original error occurred because the code was trying to access
            #  the 'text' key within the 'chunk' dictionary.  The 'chunk' dictionary
            #  does NOT contain a 'text' key.  The 'text' comes from the 'segment'
            #  in the inner loop, which is not used here.
            #  Instead,  we should accumulate the text from the segments within
            #  the same speaker.  But, we don't have the segments here.
            #  The simplest fix is to just pass the entire chunk.
            #  We will reconstruct the text.
            # current_text += " " + chunk  # This line caused the error
            # The corrected way is to accumulate the text.
            # There is no 'text' in chunk, the text is in the transcription segments.
            # But we don't have the segments here, so we will reconstruct the text later.
            current_end_time = chunk_end_time  # from chunk

        # Add the text from the chunk.
        transcription = chunk.get(
            "transcription", None
        )  # Get transcription, None if missing.
        if transcription and transcription["segments"]:
            for segment in transcription["segments"]:
                current_text += " " + segment["text"]

    # Add the last paragraph
    if current_speaker is not None:
        cleaned_transcriptions.append(
            f"{current_speaker} [{current_start_time:.2f} - {current_end_time:.2f}]: {current_text}"
        )

    return cleaned_transcriptions


transcriptions_json = os.path.join(output_dir, "transcriptions.json")
cleaned_transcriptions = clean_transcription(transcriptions_json)
print(cleaned_transcriptions)

["\nSPEAKER_00 [0.03 - 10.00]:   Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?", '\nSPEAKER_01 [11.42 - 12.82]:   and correction and embarrassment.', "\nSPEAKER_00 [12.82 - 25.63]:   Do indeed round one.  Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.", "\nSPEAKER_01 [25.58 - 27.37]:   I'll earn you a few hundred million.", 'SPEAKER_00 [27.37 - 28.97]:   But what I want to know?']


In [32]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from typing import List, Dict, Tuple, Optional
import os
import json
from typing import Optional, Dict, Tuple
import torch
from pyannote.audio import Pipeline
from pyannote.core import Segment

try:
    from pydub import AudioSegment
except ImportError:
    print("pydub is not installed. Please install it using: pip install pydub")
    AudioSegment = None

try:
    import whisper_timestamped as whisper
except ImportError:
    print("whisper-timestamped is not installed. Please install it using: pip install whisper-timestamped")
    whisper = None


def diarize_audio(
    audio_file_path: str,
    max_speakers: Optional[int] = None,
    min_speakers: Optional[int] = None,
) -> Tuple[Pipeline, Dict]:
    """
    Performs speaker diarization on an audio file.

    Args:
        audio_file_path (str): Path to the audio file.
        max_speakers (int, optional): Maximum number of speakers. Defaults to None.
        min_speakers (int, optional): Minimum number of speakers. Defaults to None.

    Returns:
        tuple: A tuple containing the diarization pipeline and the diarization output.
    """
    import logging

    # Check if a GPU is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device: {device}")

    # Load the pipeline
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1",
        use_auth_token="YOUR_HUGGINGFACE_TOKEN",  # Replace with your token
    )
    pipeline.to(device)  # Move the pipeline to the selected device

    # Load the audio file
    try:
        waveform, sample_rate = torchaudio.load(audio_file_path)
    except Exception as e:
        logging.error(f"Error loading audio file: {e}")
        raise

    # Prepare the input for the pipeline
    input_data = {"waveform": waveform, "sample_rate": sample_rate}

    # Add speaker number hints if provided
    if max_speakers is not None:
        input_data["max_speakers"] = max_speakers
    if min_speakers is not None:
        input_data["min_speakers"] = min_speakers

    # Run the diarization pipeline
    try:
        diarization = pipeline(input_data)
    except Exception as e:
        logging.error(f"Error during diarization: {e}")
        raise

    logging.info("Diarization complete.")
    return pipeline, diarization



def chunk_audio(audio_file_path: str, diarization: Dict, output_directory: str) -> List[Dict]:
    """
    Chunks an audio file into segments based on the diarization output, saves them,
    and returns a list of chunk information.

    Args:
        audio_file_path (str): Path to the audio file.
        diarization (Dict): Diarization output from the pyannote pipeline.
        output_directory (str): Directory to save the chunked audio files.

    Returns:
        List[Dict]: A list where each element is a dictionary containing:
            - file_path (str): Path to the saved audio chunk.
            - speaker (str): Speaker label.
            - start_time (float): Start time of the chunk in seconds.
            - end_time (float): End time of the chunk in seconds.
    """
    # Create the output directory if it doesn't exist
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # Load the audio file using pydub
    if AudioSegment is None:
        print("Skipping audio chunking because pydub is not installed.")
        return []

    audio = AudioSegment.from_file(audio_file_path)

    chunk_info_list = []
    chunk_number = 0  # Initialize chunk counter

    # Iterate over each turn in the diarization output
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        start_time = turn.start
        end_time = turn.end
        start_time_ms = int(start_time * 1000)  # Convert seconds to milliseconds
        end_time_ms = int(end_time * 1000)  # Convert seconds to milliseconds

        chunk = audio[start_time_ms:end_time_ms]
        chunk_number += 1  # Increment chunk counter for filename
        output_file_path = os.path.join(
            output_directory, f"chunk_{chunk_number}.mp3"  # Use chunk number in filename
        )
        chunk.export(output_file_path, format="mp3")

        chunk_info = {
            "file_path": output_file_path,
            "speaker": speaker,
            "start_time": start_time,
            "end_time": end_time,
        }
        chunk_info_list.append(chunk_info)

    return chunk_info_list



def transcribe_audio_chunk(audio_file_path: str) -> Dict:
    """
    Transcribes an audio file using whisper-timestamped.

    Args:
        audio_file_path (str): Path to the audio file.

    Returns:
        Dict: The transcription with word-level timestamps.  Returns empty dict on error.
    """
    if whisper is None:
        print("whisper-timestamped is not installed. Transcription is skipped.")
        return {}

    try:
        # Load audio and model
        model = whisper.load_model("base")  # You can change the model size if needed
        audio = whisper.load_audio(audio_file_path)

        # Transcribe the audio file
        result = whisper.transcribe(model, audio, language="en")  # You can change the language
        return result
    except Exception as e:
        logging.error(f"Error during transcription: {e}")
        print(f"Error during transcription: {e}")
        return {}  # Return empty dict on error



def process_and_transcribe_chunks(chunk_info_list: List[Dict], output_dir: str) -> None:
    """
    Transcribes audio chunks using whisper-timestamped and saves the transcriptions
    to a JSON file.

    Args:
        chunk_info_list (List[Dict]): A list of dictionaries, where each dictionary
            contains chunk information as returned by the chunk_audio function.
        output_dir (str): The directory where the JSON file should be saved.
    """
    if not chunk_info_list:
        print("No chunks to transcribe.")
        return

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    transcriptions = []
    for chunk_info in chunk_info_list:
        chunk_file_path = chunk_info["file_path"]
        speaker = chunk_info["speaker"]
        start_time = chunk_info["start_time"]
        end_time = chunk_info["end_time"]

        print(f"Transcribing chunk: {chunk_file_path}, Speaker: {speaker}, Start: {start_time:.2f}, End: {end_time:.2f}")
        transcription = transcribe_audio_chunk(chunk_file_path) # returns {} on error

        if transcription: # only add if transcription was successful
            transcription_data = {
                "speaker": speaker,
                "start_time": start_time,
                "end_time": end_time,
                "transcription": transcription,
            }
            transcriptions.append(transcription_data)

            # Print the transcription
            print(f"Transcription for {chunk_file_path}:")
            for segment in transcription["segments"]:
                print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

    # Save all transcriptions to a JSON file
    output_json_path = os.path.join(output_dir, "transcriptions.json")
    try:
        with open(output_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        print(f"Transcriptions saved to {output_json_path}")
    except Exception as e:
        logging.error(f"Error saving transcriptions to JSON: {e}")
        print(f"Error saving transcriptions to JSON: {e}")




def clean_transcription(transcriptions_json_path: str) -> List[str]:
    """
    Cleans the transcription data from a JSON file, merging text from the same speaker
    into paragraphs.  The output format is:  "SpeakerName [start_time - end_time]: text"

    Args:
        transcriptions_json_path (str): Path to the JSON file containing the transcription data
            generated by process_and_transcribe_chunks.

    Returns:
        List[str]: A list of strings, where each string represents a paragraph
        in the format "SpeakerName [start_time - end_time]: text".
    """
    try:
        with open(transcriptions_json_path, "r") as f:
            transcriptions = json.load(f)
    except Exception as e:
        logging.error(f"Error loading transcriptions JSON: {e}")
        print(f"Error loading transcriptions JSON: {e}")
        return []

    cleaned_transcriptions = []
    current_speaker = None
    current_text = ""
    current_start_time = None
    current_end_time = None

    for chunk in transcriptions:
        speaker = chunk["speaker"]
        # transcription = chunk["transcription"] # Removed unused variable
        chunk_start_time = chunk["start_time"]  # Use chunk start/end times
        chunk_end_time = chunk["end_time"]

        # if not transcription or not ["segments"]: # Removed check for transcription
        #     continue  # Skip empty transcriptions
        if current_speaker != speaker:
            if current_speaker is not None:
                cleaned_transcriptions.append(
                    f"\n{current_speaker} [{current_start_time:.2f} - {current_end_time:.2f}]: {current_text}"  # Add newline
                )
            current_speaker = speaker
            current_text = ""  # Reset text for new speaker.
            current_start_time = chunk_start_time  # from chunk
            current_end_time = chunk_end_time  # from chunk
        else:
            #  The original error occurred because the code was trying to access
            #  the 'text' key within the 'chunk' dictionary.  The 'chunk' dictionary
            #  does NOT contain a 'text' key.  The 'text' comes from the 'segment'
            #  in the inner loop, which is not used here.
            #  Instead,  we should accumulate the text from the segments within
            #  the same speaker.  But, we don't have the segments here.
            #  The simplest fix is to just pass the entire chunk.
            #  We will reconstruct the text.
            # current_text += " " + chunk  # This line caused the error
            # The corrected way is to accumulate the text.
            # There is no 'text' in chunk, the text is in the transcription segments.
            # But we don't have the segments here, so we will reconstruct the text later.
            current_end_time = chunk_end_time  # from chunk

        # Add the text from the chunk.
        transcription = chunk.get("transcription", None)  # Get transcription, None if missing.
        if transcription and transcription["segments"]:
            for segment in transcription["segments"]:
                current_text += " " + segment["text"]

    # Add the last paragraph
    if current_speaker is not None:
        cleaned_transcriptions.append(
            f"{current_speaker} [{current_start_time:.2f} - {current_end_time:.2f}]: {current_text}"
        )

    return cleaned_transcriptions


def save_transcription_to_file(transcription_text: List[str], audio_file_path: str, output_dir: str) -> None:
    """
    Saves the cleaned transcription text to a file.  The filename is derived from the
    original audio file name.

    Args:
        transcription_text (List[str]):  A list of strings, where each string represents a paragraph
            in the format "SpeakerName [start_time - end_time]: text".  This is the output
            from the clean_transcription function.
        audio_file_path (str):  The path to the original audio file.  This is used to derive
            the output filename.
        output_dir (str): The directory where the text file should be saved.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Get the base filename of the audio file.
    audio_filename_base = os.path.splitext(os.path.basename(audio_file_path))[0]
    output_file_path = os.path.join(output_dir, f"{audio_filename_base}.txt")

    try:
        with open(output_file_path, "w") as f:
            for line in transcription_text:
                f.write(line + "\n")  # Write each line, adding a newline.
        print(f"Transcription saved to {output_file_path}")
    except Exception as e:
        logging.error(f"Error saving transcription to file: {e}")
        print(f"Error saving transcription to file: {e}")



In [33]:

# Example usage:
audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
output_dir = "chunks"
# Make sure the output directory exists
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Set up basic logging
logging.basicConfig(level=logging.INFO)

try:
    pipeline, diarization_result = diarize_audio(audio_file)
    chunk_info_list = chunk_audio(audio_file, diarization_result, output_dir)
    print("Audio file diarized and chunked.")

    process_and_transcribe_chunks(chunk_info_list, output_dir)

    transcriptions_json = os.path.join(output_dir, "transcriptions.json")
    cleaned_transcriptions = clean_transcription(transcriptions_json)

    print("\nCleaned Transcriptions:")
    for paragraph in cleaned_transcriptions:
        print(paragraph)

    save_transcription_to_file(
        cleaned_transcriptions, audio_file, output_dir
    )  # Save to text file

except Exception as e:
    logging.error(f"An error occurred: {e}")
    print(f"An error occurred: {e}")

INFO:root:Using device: cpu
  std = sequences.std(dim=-1, correction=1)
INFO:root:Diarization complete.


Audio file diarized and chunked.
Transcribing chunk: chunks\chunk_1.mp3, Speaker: SPEAKER_00, Start: 0.03, End: 10.00


100%|██████████| 997/997 [00:03<00:00, 261.97frames/s]


Transcription for chunks\chunk_1.mp3:
    [0.00 - 1.58]  Anyway, look where we're digressing the rules.
    [1.68 - 4.32]  Oh, simple, Emma, you're about to face five questions
    [4.32 - 5.30]  of increasing difficulty.
    [5.32 - 6.60]  You must answer as quickly as possible.
    [6.62 - 8.32]  If you get it correct, you move onto the next round.
    [8.58 - 9.76]  Do you know what happens if you get it wrong?
Transcribing chunk: chunks\chunk_2.mp3, Speaker: SPEAKER_01, Start: 11.42, End: 12.82


100%|██████████| 140/140 [00:01<00:00, 135.66frames/s]


Transcription for chunks\chunk_2.mp3:
    [0.10 - 1.14]  and correction and embarrassment.
Transcribing chunk: chunks\chunk_3.mp3, Speaker: SPEAKER_00, Start: 12.82, End: 14.91


100%|██████████| 209/209 [00:01<00:00, 187.77frames/s]


Transcription for chunks\chunk_3.mp3:
    [0.00 - 1.86]  Do indeed round one.
Transcribing chunk: chunks\chunk_4.mp3, Speaker: SPEAKER_00, Start: 16.94, End: 25.63


100%|██████████| 869/869 [00:01<00:00, 442.11frames/s]


Transcription for chunks\chunk_4.mp3:
    [0.08 - 4.48]  Round 1 astronomers are saying that Saturn's rings are slowly disappearing.
    [4.76 - 8.70]  They estimate we only have a few hundred million years left of them.
Transcribing chunk: chunks\chunk_5.mp3, Speaker: SPEAKER_01, Start: 25.58, End: 27.37


100%|██████████| 178/178 [00:01<00:00, 140.71frames/s]


Transcription for chunks\chunk_5.mp3:
    [0.06 - 1.52]  I'll earn you a few hundred million.
Transcribing chunk: chunks\chunk_6.mp3, Speaker: SPEAKER_00, Start: 27.37, End: 28.97


100%|██████████| 143/143 [00:02<00:00, 47.95frames/s]


Transcription for chunks\chunk_6.mp3:
    [0.16 - 1.08]  But what I want to know?
Transcriptions saved to chunks\transcriptions.json

Cleaned Transcriptions:

SPEAKER_00 [0.03 - 10.00]:   Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?

SPEAKER_01 [11.42 - 12.82]:   and correction and embarrassment.

SPEAKER_00 [12.82 - 25.63]:   Do indeed round one.  Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.

SPEAKER_01 [25.58 - 27.37]:   I'll earn you a few hundred million.
SPEAKER_00 [27.37 - 28.97]:   But what I want to know?
Transcription saved to chunks\test.txt


# thoerial the best

In [None]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from typing import List, Dict, Tuple, Optional
import os
import json
from typing import Optional, Dict, Tuple
import torch
from pyannote.audio import Pipeline
from pyannote.core import Segment
from pyannote.core import Annotation

try:
    from pydub import AudioSegment
except ImportError:
    print("pydub is not installed. Please install it using: pip install pydub")
    AudioSegment = None

try:
    import whisper_timestamped as whisper
except ImportError:
    print(
        "whisper-timestamped is not installed. Please install it using: pip install whisper-timestamped"
    )
    whisper = None


def diarize_audio(
    audio_file_path: str,
    max_speakers: Optional[int] = None,
    min_speakers: Optional[int] = None,
) -> Tuple[Pipeline, Dict, Dict]:
    """
    Performs speaker diarization on an audio file.  Also creates a speaker names mapping.

    Args:
        audio_file_path (str): Path to the audio file.
        max_speakers (int, optional): Maximum number of speakers. Defaults to None.
        min_speakers (int, optional): Minimum number of speakers. Defaults to None.

    Returns:
        tuple: A tuple containing the diarization pipeline, the diarization output,
               and a dictionary of speaker names.
    """
    import logging

    # Check if a GPU is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device: {device}")

    # Load the pipeline
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1",
        use_auth_token="YOUR_HUGGINGFACE_TOKEN",  # Replace with your token
    )
    pipeline.to(device)  # Move the pipeline to the selected device

    # Load the audio file
    try:
        waveform, sample_rate = torchaudio.load(audio_file_path)
    except Exception as e:
        logging.error(f"Error loading audio file: {e}")
        raise

    # Prepare the input for the pipeline
    input_data = {"waveform": waveform, "sample_rate": sample_rate}

    # Add speaker number hints if provided
    if max_speakers is not None:
        input_data["max_speakers"] = max_speakers
    if min_speakers is not None:
        input_data["min_speakers"] = min_speakers

    # Run the diarization pipeline
    try:
        diarization = pipeline(input_data)
    except Exception as e:
        logging.error(f"Error during diarization: {e}")
        raise

    # Create speaker names dictionary.  Default is Speaker 1, Speaker 2, etc.
    speaker_names = {}
    if isinstance(diarization, Annotation):
        # Iterate over the segments and their corresponding labels.
        for segment, track_name, label in diarization.itertracks(yield_label=True):
            if label not in speaker_names:
                speaker_names[label] = f"Speaker {len(speaker_names) + 1}"

    logging.info("Diarization complete.")
    return pipeline, diarization, speaker_names


def chunk_audio(
    audio_file_path: str, diarization: Dict, output_directory: str
) -> List[Dict]:
    """
    Chunks an audio file into segments based on the diarization output, saves them,
    and returns a list of chunk information.

    Args:
        audio_file_path (str): Path to the audio file.
        diarization (Dict): Diarization output from the pyannote pipeline.
        output_directory (str): Directory to save the chunked audio files.

    Returns:
        List[Dict]: A list where each element is a dictionary containing:
            - file_path (str): Path to the saved audio chunk.
            - speaker (str): Speaker label.
            - start_time (float): Start time of the chunk in seconds.
            - end_time (float): End time of the chunk in seconds.
    """
    # Create the output directory if it doesn't exist
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # Load the audio file using pydub
    if AudioSegment is None:
        print("Skipping audio chunking because pydub is not installed.")
        return []

    audio = AudioSegment.from_file(audio_file_path)

    chunk_info_list = []
    chunk_number = 0  # Initialize chunk counter

    # Iterate over each turn in the diarization output
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        start_time = turn.start
        end_time = turn.end
        start_time_ms = int(start_time * 1000)  # Convert seconds to milliseconds
        end_time_ms = int(end_time * 1000)  # Convert seconds to milliseconds

        chunk = audio[start_time_ms:end_time_ms]
        chunk_number += 1  # Increment chunk counter for filename
        output_file_path = os.path.join(
            output_directory,
            f"chunk_{chunk_number}.mp3",  # Use chunk number in filename
        )
        chunk.export(output_file_path, format="mp3")

        chunk_info = {
            "file_path": output_file_path,
            "speaker": speaker,
            "start_time": start_time,
            "end_time": end_time,
        }
        chunk_info_list.append(chunk_info)

    return chunk_info_list


def transcribe_audio_chunk(audio_file_path: str) -> Dict:
    """
    Transcribes an audio file using whisper-timestamped.

    Args:
        audio_file_path (str): Path to the audio file.

    Returns:
        Dict: The transcription with word-level timestamps.  Returns empty dict on error.
    """
    if whisper is None:
        print("whisper-timestamped is not installed. Transcription is skipped.")
        return {}

    try:
        # Load audio and model
        model = whisper.load_model("base")  # You can change the model size if needed
        audio = whisper.load_audio(audio_file_path)

        # Transcribe the audio file
        result = whisper.transcribe(
            model, audio, language="en"
        )  # You can change the language
        return result
    except Exception as e:
        logging.error(f"Error during transcription: {e}")
        print(f"Error during transcription: {e}")
        return {}  # Return empty dict on error


def process_and_transcribe_chunks(chunk_info_list: List[Dict], output_dir: str) -> None:
    """
    Transcribes audio chunks using whisper-timestamped and saves the transcriptions
    to a JSON file.

    Args:
        chunk_info_list (List[Dict]): A list of dictionaries, where each dictionary
            contains chunk information as returned by the chunk_audio function.
        output_dir (str): The directory where the JSON file should be saved.
    """
    if not chunk_info_list:
        print("No chunks to transcribe.")
        return

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    transcriptions = []
    for chunk_info in chunk_info_list:
        chunk_file_path = chunk_info["file_path"]
        speaker = chunk_info["speaker"]
        start_time = chunk_info["start_time"]
        end_time = chunk_info["end_time"]

        print(
            f"Transcribing chunk: {chunk_file_path}, Speaker: {speaker}, Start: {start_time:.2f}, End: {end_time:.2f}"
        )
        transcription = transcribe_audio_chunk(chunk_file_path)  # returns {} on error

        if transcription:  # only add if transcription was successful
            transcription_data = {
                "speaker": speaker,
                "start_time": start_time,
                "end_time": end_time,
                "transcription": transcription,
            }
            transcriptions.append(transcription_data)

            # Print the transcription
            print(f"Transcription for {chunk_file_path}:")
            for segment in transcription["segments"]:
                print(
                    f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}"
                )

    # Save all transcriptions to a JSON file
    output_json_path = os.path.join(output_dir, "transcriptions.json")
    try:
        with open(output_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        print(f"Transcriptions saved to {output_json_path}")
    except Exception as e:
        logging.error(f"Error saving transcriptions to JSON: {e}")
        print(f"Error saving transcriptions to JSON: {e}")


def clean_transcription(
    transcriptions_json_path: str, speaker_names: Dict[str, str]
) -> List[str]:
    """
    Cleans the transcription data from a JSON file, merging text from the same speaker
    into paragraphs.  The output format is:  "SpeakerName [start_time - end_time]: text"

    Args:
        transcriptions_json_path (str): Path to the JSON file containing the transcription data
            generated by process_and_transcribe_chunks.
        speaker_names (Dict[str, str]): A dictionary mapping speaker labels (e.g., "SPEAKER_01")
            to speaker names (e.g., "John Doe").

    Returns:
        List[str]: A list of strings, where each string represents a paragraph
        in the format "SpeakerName [start_time - end_time]: text".
    """
    try:
        with open(transcriptions_json_path, "r") as f:
            transcriptions = json.load(f)
    except Exception as e:
        logging.error(f"Error loading transcriptions JSON: {e}")
        print(f"Error loading transcriptions JSON: {e}")
        return []

    cleaned_transcriptions = []
    current_speaker = None
    current_text = ""
    current_start_time = None
    current_end_time = None

    for chunk in transcriptions:
        speaker = chunk["speaker"]
        chunk_start_time = chunk["start_time"]
        chunk_end_time = chunk["end_time"]

        if current_speaker != speaker:
            if current_speaker is not None:
                # Use the speaker name from the dictionary, or the original if not found.
                speaker_name = speaker_names.get(current_speaker, current_speaker)
                cleaned_transcriptions.append(
                    f"\n{speaker_name} [{current_start_time:.2f} - {current_end_time:.2f}]: {current_text}"
                )
            current_speaker = speaker
            current_text = ""
            current_start_time = chunk_start_time
            current_end_time = chunk_end_time

        transcription = chunk.get("transcription", None)
        if transcription and transcription["segments"]:
            for segment in transcription["segments"]:
                current_text += " " + segment["text"]

    if current_speaker is not None:
        speaker_name = speaker_names.get(current_speaker, current_speaker)
        cleaned_transcriptions.append(
            f"{speaker_name} [{current_start_time:.2f} - {current_end_time:.2f}]: {current_text}"
        )

    return cleaned_transcriptions


def save_transcription_to_file(
    transcription_text: List[str], audio_file_path: str, output_dir: str
) -> None:
    """
    Saves the cleaned transcription text to a file.  The filename is derived from the
    original audio file name.

    Args:
        transcription_text (List[str]):  A list of strings, where each string represents a paragraph
            in the format "SpeakerName [start_time - end_time]: text".  This is the output
            from the clean_transcription function.
        audio_file_path (str):  The path to the original audio file.  This is used to derive
            the output filename.
        output_dir (str): The directory where the text file should be saved.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Get the base filename of the audio file.
    audio_filename_base = os.path.splitext(os.path.basename(audio_file_path))[0]
    output_file_path = os.path.join(output_dir, f"{audio_filename_base}.txt")

    try:
        with open(output_file_path, "w") as f:
            for line in transcription_text:
                f.write(line + "\n")  # Write each line, adding a newline.
        print(f"Transcription saved to {output_file_path}")
    except Exception as e:
        logging.error(f"Error saving transcription to file: {e}")
        print(f"Error saving transcription to file: {e}")


if __name__ == "__main__":
    # Example usage:
    audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
    output_dir = "chunks"
    # Make sure the output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Set up basic logging
    logging.basicConfig(level=logging.INFO)

    try:
        pipeline, diarization_result, speaker_names = diarize_audio(
            audio_file
        )  # Get speaker_names
        chunk_info_list = chunk_audio(audio_file, diarization_result, output_dir)
        print("Audio file diarized and chunked.")

        # Save speaker names to a JSON file
        speaker_names_path = os.path.join(output_dir, "speaker_names.json")
        with open(speaker_names_path, "w") as f:
            json.dump(speaker_names, f, indent=4)
        print(f"Speaker names saved to {speaker_names_path}")

        # You can modify speaker_names here if needed
        speaker_names["SPEAKER_01"] = "Moderator"
        speaker_names["SPEAKER_02"] = "Participant 1"

        process_and_transcribe_chunks(chunk_info_list, output_dir)

        transcriptions_json = os.path.join(output_dir, "transcriptions.json")
        cleaned_transcriptions = clean_transcription(
            transcriptions_json, speaker_names
        )  # Pass speaker_names
        print("\nCleaned Transcriptions:")
        for paragraph in cleaned_transcriptions:
            print(paragraph)

        save_transcription_to_file(
            cleaned_transcriptions, audio_file, output_dir
        )  # Save to text file

    except Exception as e:
        logging.error(f"An error occurred: {e}")
        print(f"An error occurred: {e}")

INFO:root:Using device: cpu


  std = sequences.std(dim=-1, correction=1)
INFO:root:Diarization complete.


Audio file diarized and chunked.
Speaker names saved to chunks\speaker_names.json
Transcribing chunk: chunks\chunk_1.mp3, Speaker: SPEAKER_00, Start: 0.03, End: 10.00


100%|██████████| 997/997 [00:03<00:00, 273.25frames/s]


Transcription for chunks\chunk_1.mp3:
    [0.00 - 1.58]  Anyway, look where we're digressing the rules.
    [1.68 - 4.32]  Oh, simple, Emma, you're about to face five questions
    [4.32 - 5.30]  of increasing difficulty.
    [5.32 - 6.60]  You must answer as quickly as possible.
    [6.62 - 8.32]  If you get it correct, you move onto the next round.
    [8.58 - 9.76]  Do you know what happens if you get it wrong?
Transcribing chunk: chunks\chunk_2.mp3, Speaker: SPEAKER_01, Start: 11.42, End: 12.82


100%|██████████| 140/140 [00:01<00:00, 135.27frames/s]


Transcription for chunks\chunk_2.mp3:
    [0.10 - 1.14]  and correction and embarrassment.
Transcribing chunk: chunks\chunk_3.mp3, Speaker: SPEAKER_00, Start: 12.82, End: 14.91


100%|██████████| 209/209 [00:01<00:00, 206.29frames/s]


Transcription for chunks\chunk_3.mp3:
    [0.00 - 1.86]  Do indeed round one.
Transcribing chunk: chunks\chunk_4.mp3, Speaker: SPEAKER_00, Start: 16.94, End: 25.63


100%|██████████| 869/869 [00:01<00:00, 474.21frames/s]


Transcription for chunks\chunk_4.mp3:
    [0.08 - 4.48]  Round 1 astronomers are saying that Saturn's rings are slowly disappearing.
    [4.76 - 8.70]  They estimate we only have a few hundred million years left of them.
Transcribing chunk: chunks\chunk_5.mp3, Speaker: SPEAKER_01, Start: 25.58, End: 27.37


100%|██████████| 178/178 [00:01<00:00, 154.51frames/s]


Transcription for chunks\chunk_5.mp3:
    [0.06 - 1.52]  I'll earn you a few hundred million.
Transcribing chunk: chunks\chunk_6.mp3, Speaker: SPEAKER_00, Start: 27.37, End: 28.97


100%|██████████| 143/143 [00:01<00:00, 122.63frames/s]

Transcription for chunks\chunk_6.mp3:
    [0.16 - 1.08]  But what I want to know?
Transcriptions saved to chunks\transcriptions.json

Cleaned Transcriptions:

Speaker 1 [0.03 - 10.00]:   Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?

Speaker 2 [11.42 - 12.82]:   and correction and embarrassment.

Speaker 1 [12.82 - 14.91]:   Do indeed round one.  Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.

Speaker 2 [25.58 - 27.37]:   I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:   But what I want to know?
Transcription saved to chunks\test.txt





# optimized


In [1]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

# Load the Hugging Face token from config.py
try:
    from config import HF_TOKEN
except ImportError:
    HF_TOKEN = None
    logging.warning(
        "config.py not found or HF_TOKEN not defined. Diarization may not work."
        " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
    )
WHISPER_MODEL = "base"
#TRANSCRIPTION_LANGUAGE = "en"
DEBUG_MODE = False  # Set to False to disable debug printing

# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_audio(audio_file_path: str) -> Tuple[torch.Tensor, int]:
    """Loads an audio file using torchaudio."""
    try:
        waveform, sample_rate = torchaudio.load(audio_file_path)
        return waveform, sample_rate
    except Exception as e:
        logging.error(f"Error loading audio file: {e}")
        raise

def run_diarization(audio_file_path: str, max_speakers: Optional[int] = None, min_speakers: Optional[int] = None) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
    """Performs speaker diarization."""
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device: {device}")
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN
    ).to(device)
    waveform, sample_rate = load_audio(audio_file_path)
    input_data = {"waveform": waveform, "sample_rate": sample_rate}
    if max_speakers: input_data["max_speakers"] = max_speakers
    if min_speakers: input_data["min_speakers"] = min_speakers
    diarization = pipeline(input_data)
    speaker_names = {label: f"Speaker {i + 1}" for i, label in enumerate(sorted(set(segment.label for segment in diarization.segments)))}
    return pipeline, diarization, speaker_names

def chunk_audio(audio_file_path: str, diarization: Annotation, output_directory: str) -> List[Dict]:
    """Chunks audio based on diarization."""
    if not os.path.exists(output_directory): os.makedirs(output_directory)
    audio = AudioSegment.from_file(audio_file_path)
    chunks = []
    for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
        start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
        chunk_path = os.path.join(output_directory, f"chunk_{i}.mp3")
        audio[start_ms:end_ms].export(chunk_path, format="mp3")
        chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
    return chunks

def transcribe_chunk(audio_file_path: str) -> Dict:
    """Transcribes an audio chunk."""
    try:
        model = whisper.load_model(WHISPER_MODEL)
        audio = whisper.load_audio(audio_file_path)
        return whisper.transcribe(model, audio, language=TRANSCRIPTION_LANGUAGE)
    except Exception as e:
        logging.error(f"Transcription error: {e}")
        return {}

def process_transcriptions(chunks: List[Dict], output_dir: str) -> List[Dict]:
    """Processes and saves transcriptions."""
    transcriptions = []
    for chunk in chunks:
        logging.info(f"Transcribing {chunk['file_path']}")
        transcription = transcribe_chunk(chunk["file_path"])
        if transcription:
            transcriptions.append({**chunk, "transcription": transcription})
            if DEBUG_MODE: # only print if debug mode is on
                print(f"Transcription for {chunk['file_path']}:")
                for segment in transcription["segments"]:
                    print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

    with open(os.path.join(output_dir, "transcriptions.json"), "w") as f:
        json.dump(transcriptions, f, indent=4)
    return transcriptions

def clean_and_save_transcriptions(transcriptions: List[Dict], speaker_names: Dict[str, str], audio_file_path: str, output_dir: str):
    """Cleans and saves transcriptions to a text file."""
    cleaned = []
    current_speaker, current_text, current_start, current_end = None, "", None, None
    for chunk in transcriptions:
        if current_speaker != chunk["speaker"]:
            if current_speaker:
                cleaned.append(f"\n{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
            current_speaker, current_text = chunk["speaker"], ""
            current_start, current_end = chunk["start_time"], chunk["end_time"]
        if chunk["transcription"] and chunk["transcription"]["segments"]:
            current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
    if current_speaker:
        cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
    output_file = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(audio_file_path))[0]}.txt")
    with open(output_file, "w") as f:
        f.write("\n".join(cleaned))

    if DEBUG_MODE: # only print the cleaned transcriptions if debug mode is on
        print("\nCleaned Transcriptions:")
        for paragraph in cleaned:
            print(paragraph)

  from .autonotebook import tqdm as notebook_tqdm


Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.



NameError: name 'Annotation' is not defined

In [None]:
# Example usage:
audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
output_dir = "chunks"
# Make sure the output directory exists
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Set up basic logging
logging.basicConfig(level=logging.INFO)

try:
    pipeline, diarization_result, speaker_names = diarize_audio(
        audio_file
    )  # Get speaker_names
    chunk_info_list = chunk_audio(audio_file, diarization_result, output_dir)
    print("Audio file diarized and chunked.")

    # Save speaker names to a JSON file
    speaker_names_path = os.path.join(output_dir, "speaker_names.json")
    with open(speaker_names_path, "w") as f:
        json.dump(speaker_names, f, indent=4)
    print(f"Speaker names saved to {speaker_names_path}")

    # You can modify speaker_names here if needed
    speaker_names["SPEAKER_01"] = "Moderator"
    speaker_names["SPEAKER_02"] = "Participant 1"

    process_and_transcribe_chunks(chunk_info_list, output_dir)

    transcriptions_json = os.path.join(output_dir, "transcriptions.json")
    cleaned_transcriptions = clean_transcription(
        transcriptions_json, speaker_names
    )  # Pass speaker_names
    print("\nCleaned Transcriptions:")
    for paragraph in cleaned_transcriptions:
        print(paragraph)

    save_transcription_to_file(
        cleaned_transcriptions, audio_file, output_dir
    )  # Save to text file

except Exception as e:
    logging.error(f"An error occurred: {e}")
    print(f"An error occurred: {e}")

INFO:root:Using device: cpu
  std = sequences.std(dim=-1, correction=1)
INFO:root:Diarization complete.


Audio file diarized and chunked.
Speaker names saved to chunks\speaker_names.json
Transcribing chunk: chunks\chunk_1.mp3, Speaker: SPEAKER_00, Start: 0.03, End: 10.00


100%|██████████| 997/997 [00:03<00:00, 288.60frames/s]


Transcription for chunks\chunk_1.mp3:
    [0.00 - 1.58]  Anyway, look where we're digressing the rules.
    [1.68 - 4.32]  Oh, simple, Emma, you're about to face five questions
    [4.32 - 5.30]  of increasing difficulty.
    [5.32 - 6.60]  You must answer as quickly as possible.
    [6.62 - 8.32]  If you get it correct, you move onto the next round.
    [8.58 - 9.76]  Do you know what happens if you get it wrong?
Transcribing chunk: chunks\chunk_2.mp3, Speaker: SPEAKER_01, Start: 11.42, End: 12.82


100%|██████████| 140/140 [00:01<00:00, 137.32frames/s]


Transcription for chunks\chunk_2.mp3:
    [0.10 - 1.14]  and correction and embarrassment.
Transcribing chunk: chunks\chunk_3.mp3, Speaker: SPEAKER_00, Start: 12.82, End: 14.91


100%|██████████| 209/209 [00:00<00:00, 223.29frames/s]


Transcription for chunks\chunk_3.mp3:
    [0.00 - 1.86]  Do indeed round one.
Transcribing chunk: chunks\chunk_4.mp3, Speaker: SPEAKER_00, Start: 16.94, End: 25.63


100%|██████████| 869/869 [00:02<00:00, 393.47frames/s]


Transcription for chunks\chunk_4.mp3:
    [0.08 - 4.48]  Round 1 astronomers are saying that Saturn's rings are slowly disappearing.
    [4.76 - 8.70]  They estimate we only have a few hundred million years left of them.
Transcribing chunk: chunks\chunk_5.mp3, Speaker: SPEAKER_01, Start: 25.58, End: 27.37


100%|██████████| 178/178 [00:01<00:00, 159.92frames/s]


Transcription for chunks\chunk_5.mp3:
    [0.06 - 1.52]  I'll earn you a few hundred million.
Transcribing chunk: chunks\chunk_6.mp3, Speaker: SPEAKER_00, Start: 27.37, End: 28.97


100%|██████████| 143/143 [00:01<00:00, 127.65frames/s]

Transcription for chunks\chunk_6.mp3:
    [0.16 - 1.08]  But what I want to know?
Transcriptions saved to chunks\transcriptions.json

Cleaned Transcriptions:

Speaker 1 [0.03 - 10.00]:   Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?

Moderator [11.42 - 12.82]:   and correction and embarrassment.

Speaker 1 [12.82 - 14.91]:   Do indeed round one.  Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.

Moderator [25.58 - 27.37]:   I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:   But what I want to know?
Transcription saved to chunks\test.txt





In [6]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

# Load the Hugging Face token from config.py
try:
    from config import HF_TOKEN
except ImportError:
    HF_TOKEN = None
    logging.warning(
        "config.py not found or HF_TOKEN not defined. Diarization may not work."
        " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
    )
WHISPER_MODEL = "base"
# TRANSCRIPTION_LANGUAGE = "en" # Removed hardcoded language
DEBUG_MODE = False  # Set to False to disable debug printing

# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def load_audio(audio_file_path: str) -> Tuple[torch.Tensor, int]:
    """Loads an audio file using torchaudio."""
    try:
        waveform, sample_rate = torchaudio.load(audio_file_path)
        return waveform, sample_rate
    except Exception as e:
        logging.error(f"Error loading audio file: {e}")
        raise


def run_diarization(
    audio_file_path: str,
    max_speakers: Optional[int] = None,
    min_speakers: Optional[int] = None,
) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
    """Performs speaker diarization."""
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logging.info(f"Using device: {device}")
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1", use_auth_token=HF_TOKEN
    ).to(device)
    waveform, sample_rate = load_audio(audio_file_path)
    input_data = {"waveform": waveform, "sample_rate": sample_rate}
    if max_speakers:
        input_data["max_speakers"] = max_speakers
    if min_speakers:
        input_data["min_speakers"] = min_speakers
    diarization = pipeline(input_data)
    speaker_labels = set()
    for segment, _, label in diarization.itertracks(yield_label=True):
        speaker_labels.add(label)
    speaker_names = {
        label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
    }
    return pipeline, diarization, speaker_names


def chunk_audio(audio_file_path: str, diarization: Annotation, output_directory: str) -> List[Dict]:
    """Chunks audio based on diarization."""
    if not os.path.exists(output_directory): os.makedirs(output_directory)
    audio = AudioSegment.from_file(audio_file_path)
    chunks = []
    for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
        start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
        chunk_path = os.path.join(output_directory, f"chunk_{i}.mp3")
        audio[start_ms:end_ms].export(chunk_path, format="mp3")
        chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
    return chunks

def transcribe_chunk(audio_file_path: str, language: Optional[str] = None) -> Dict:
    """Transcribes an audio chunk."""
    try:
        model = whisper.load_model(WHISPER_MODEL)
        audio = whisper.load_audio(audio_file_path)
        return whisper.transcribe(model, audio, language=language)
    except Exception as e:
        logging.error(f"Transcription error: {e}")
        return {}

def process_and_transcribe_chunks(chunks: List[Dict], output_dir: str, language: Optional[str] = None) -> List[Dict]:
    """Processes and saves transcriptions."""
    transcriptions = []
    for chunk in chunks:
        logging.info(f"Transcribing {chunk['file_path']}")
        transcription = transcribe_chunk(chunk["file_path"], language=language)
        if transcription:
            transcriptions.append({**chunk, "transcription": transcription})
            if DEBUG_MODE: # only print if debug mode is on
                print(f"Transcription for {chunk['file_path']}:")
                for segment in transcription["segments"]:
                    print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

    with open(os.path.join(output_dir, "transcriptions.json"), "w") as f:
        json.dump(transcriptions, f, indent=4)
    return transcriptions

def clean_transcription(transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
    """Cleans the transcription JSON to a readable format."""
    with open(transcriptions_json, 'r') as f:
        transcriptions = json.load(f)

    cleaned =[]
    current_speaker, current_text, current_start, current_end = None, "", None, None
    for chunk in transcriptions:
        if current_speaker != chunk["speaker"]:
            if current_speaker:
                cleaned.append(f"\n{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
            current_speaker, current_text = chunk["speaker"], ""
            current_start, current_end = chunk["start_time"], chunk["end_time"]
        if chunk["transcription"] and chunk["transcription"]["segments"]:
            current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
    if current_speaker:
        cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
    return cleaned

def save_transcription_to_file(cleaned_transcriptions: List[str], audio_file_path: str, output_dir: str):
    """Saves the cleaned transcription to a text file."""
    output_file = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(audio_file_path))[0]}.txt")
    with open(output_file, "w") as f:
        f.write("\n".join(cleaned_transcriptions))

if __name__ == "__main__":
    audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
    output_dir = "chunks"

    # Make sure the output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Set up basic logging
    logging.basicConfig(level=logging.INFO)

    # Ask the user if they want to specify a language
    specify_language = input("Do you want to specify a language for transcription? (yes/no): ").lower()
    transcription_language = None
    if specify_language == "yes":
        transcription_language = input("Enter the language code (e.g., en, fr, es), or leave blank for auto-detection: ").strip()
        if not transcription_language:
            transcription_language = None # Explicitly set to None if user leaves it blank
    elif specify_language == "no":
        print("Using automatic language detection.")
    else:
        print("Invalid input. Using automatic language detection.")

    try:
        pipeline, diarization_result, speaker_names = run_diarization(
            audio_file
        )  # Get speaker_names
        chunk_info_list = chunk_audio(audio_file, diarization_result, output_dir)
        print("Audio file diarized and chunked.")

        # Save speaker names to a JSON file
        speaker_names_path = os.path.join(output_dir, "speaker_names.json")
        with open(speaker_names_path, "w") as f:
            json.dump(speaker_names, f, indent=4)
        print(f"Speaker names saved to {speaker_names_path}")

        # You can modify speaker_names here if needed
        speaker_names["SPEAKER_01"] = "Moderator"
        speaker_names["SPEAKER_02"] = "Participant 1"

        process_and_transcribe_chunks(chunk_info_list, output_dir, language=transcription_language)

        transcriptions_json = os.path.join(output_dir, "transcriptions.json")
        cleaned_transcriptions = clean_transcription(
            transcriptions_json, speaker_names
        )  # Pass speaker_names
        print("\nCleaned Transcriptions:")
        for paragraph in cleaned_transcriptions:
            print(paragraph)

        save_transcription_to_file(
            cleaned_transcriptions, audio_file, output_dir
        )  # Save to text file

    except Exception as e:
        logging.error(f"An error occurred: {e}")
        print(f"An error occurred: {e}")

2025-03-23 18:26:36,349 - INFO - Using device: cpu


Using automatic language detection.


  std = sequences.std(dim=-1, correction=1)
2025-03-23 18:27:10,294 - INFO - Transcribing chunks\chunk_1.mp3


Audio file diarized and chunked.
Speaker names saved to chunks\speaker_names.json
Detected language: English


100%|██████████| 997/997 [00:03<00:00, 316.61frames/s]
2025-03-23 18:27:15,779 - INFO - Transcribing chunks\chunk_2.mp3


Detected language: English


100%|██████████| 140/140 [00:00<00:00, 148.70frames/s]
2025-03-23 18:27:18,679 - INFO - Transcribing chunks\chunk_3.mp3


Detected language: English


100%|██████████| 209/209 [00:01<00:00, 172.15frames/s]
2025-03-23 18:27:21,890 - INFO - Transcribing chunks\chunk_4.mp3


Detected language: English


100%|██████████| 869/869 [00:01<00:00, 448.40frames/s]
2025-03-23 18:27:25,951 - INFO - Transcribing chunks\chunk_5.mp3


Detected language: English


100%|██████████| 178/178 [00:02<00:00, 61.94frames/s]
2025-03-23 18:27:31,068 - INFO - Transcribing chunks\chunk_6.mp3


Detected language: English


100%|██████████| 143/143 [00:01<00:00, 140.95frames/s]


Cleaned Transcriptions:

Speaker 1 [0.03 - 10.00]:  Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?

Moderator [11.42 - 12.82]:  and correction and embarrassment.

Speaker 1 [12.82 - 14.91]:  Do indeed round one. Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.

Moderator [25.58 - 27.37]:  I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:  But what I want to know?





In [8]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None):
        self.audio_file = audio_file
        self.output_dir = output_dir
        self.hf_token = hf_token
        self.whisper_model = "base"
        self.debug_mode = False
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks =[]
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None) -> Dict:
        """Transcribes an audio chunk."""
        try:
            model = whisper.load_model(self.whisper_model)
            audio = whisper.load_audio(audio_file_path)
            return whisper.transcribe(model, audio, language=language)
        except Exception as e:
            logging.error(f"Transcription error: {e}")
            return {}

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None) -> List[Dict]:
        """Processes and saves transcriptions."""
        transcriptions =[]
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            transcription = self.transcribe_chunk(chunk["file_path"], language=language)
            if transcription:
                transcriptions.append({**chunk, "transcription": transcription})
                if self.debug_mode: # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']}:")
                    for segment in transcription["segments"]:
                        print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned =[]
        current_speaker, current_text, current_start, current_end = None, "", None, None
        for chunk in transcriptions:
            if current_speaker != chunk["speaker"]:
                if current_speaker:
                    cleaned.append(f"\n{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str]):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{os.path.splitext(os.path.basename(self.audio_file))[0]}.txt")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None):
        """Orchestrates the audio processing pipeline."""
        try:
            pipeline, diarization_result, speaker_names = self.run_diarization()
            chunk_info_list = self.chunk_audio(diarization_result)
            print("Audio file diarized and chunked.")

            speaker_names_path = os.path.join(self.output_dir, "speaker_names.json")
            with open(speaker_names_path, "w") as f:
                json.dump(speaker_names, f, indent=4)
            print(f"Speaker names saved to {speaker_names_path}")

            # You can modify speaker_names here if needed
            speaker_names["SPEAKER_01"] = "Moderator"
            speaker_names["SPEAKER_02"] = "Participant 1"

            self.process_and_transcribe_chunks(chunk_info_list, language=language)

            transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
            cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
            print("\nCleaned Transcriptions:")
            for paragraph in cleaned_transcriptions:
                print(paragraph)

            self.save_transcription_to_file(cleaned_transcriptions)

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

if __name__ == "__main__":
    audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
    output_dir = "chunks"

    # Load HF_TOKEN if config.py exists
    hf_token = None
    try:
        from config import HF_TOKEN as config_token
        hf_token = config_token
    except ImportError:
        logging.warning(
            "config.py not found or HF_TOKEN not defined. Diarization may not work."
            " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
        )

    transcriber = AudioTranscriber(audio_file, output_dir, hf_token)

    # Ask the user if they want to specify a language
    specify_language = input("Do you want to specify a language for transcription? (yes/no): ").lower()
    transcription_language = None
    if specify_language == "yes":
        transcription_language = input("Enter the language code (e.g., en, fr, es), or leave blank for auto-detection: ").strip()
        if not transcription_language:
            transcription_language = None # Explicitly set to None if user leaves it blank
    elif specify_language == "no":
        print("Using automatic language detection.")
    else:
        print("Invalid input. Using automatic language detection.")

    transcriber.process_audio(language=transcription_language)

2025-03-23 18:38:10,444 - INFO - Using device: cpu


Using automatic language detection.


  std = sequences.std(dim=-1, correction=1)
2025-03-23 18:38:38,894 - INFO - Transcribing chunks\chunk_1.mp3


Audio file diarized and chunked.
Speaker names saved to chunks\speaker_names.json
Detected language: English


100%|██████████| 997/997 [00:03<00:00, 300.75frames/s]
2025-03-23 18:38:44,593 - INFO - Transcribing chunks\chunk_2.mp3


Detected language: English


100%|██████████| 140/140 [00:00<00:00, 140.28frames/s]
2025-03-23 18:38:47,603 - INFO - Transcribing chunks\chunk_3.mp3


Detected language: English


100%|██████████| 209/209 [00:01<00:00, 201.33frames/s]
2025-03-23 18:38:50,962 - INFO - Transcribing chunks\chunk_4.mp3


Detected language: English


100%|██████████| 869/869 [00:01<00:00, 510.26frames/s]
2025-03-23 18:38:54,607 - INFO - Transcribing chunks\chunk_5.mp3


Detected language: English


100%|██████████| 178/178 [00:01<00:00, 150.02frames/s]
2025-03-23 18:38:57,857 - INFO - Transcribing chunks\chunk_6.mp3


Detected language: English


100%|██████████| 143/143 [00:01<00:00, 113.89frames/s]


Cleaned Transcriptions:

Speaker 1 [0.03 - 10.00]:  Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?

Moderator [11.42 - 12.82]:  and correction and embarrassment.

Speaker 1 [12.82 - 14.91]:  Do indeed round one. Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.

Moderator [25.58 - 27.37]:  I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:  But what I want to know?





In [11]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None, skip_diarization: bool = False):
        self.audio_file = audio_file
        self.output_dir = output_dir
        self.hf_token = hf_token
        self.whisper_model = "base"
        self.debug_mode = False
        self.skip_diarization = skip_diarization
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks =[]
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None) -> Dict:
        """Transcribes an audio chunk."""
        try:
            model = whisper.load_model(self.whisper_model)
            audio = whisper.load_audio(audio_file_path)
            return whisper.transcribe(model, audio, language=language)
        except Exception as e:
            logging.error(f"Transcription error: {e}")
            return {}

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None) -> List[Dict]:
        """Processes and saves transcriptions for individual chunks."""
        transcriptions =[]
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            transcription = self.transcribe_chunk(chunk["file_path"], language=language)
            if transcription:
                transcriptions.append({**chunk, "transcription": transcription})
                if self.debug_mode: # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']}:")
                    for segment in transcription["segments"]:
                        print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions

    def transcribe_whole_audio(self, language: Optional[str] = None) -> Dict:
        """Transcribes the entire audio file without diarization."""
        logging.info(f"Transcribing the entire audio file: {self.audio_file}")
        return self.transcribe_chunk(self.audio_file, language=language)

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format (for diarized audio)."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned =[]
        current_speaker, current_text, current_start, current_end = None, "", None, None
        for chunk in transcriptions:
            if current_speaker != chunk["speaker"]:
                if current_speaker:
                    cleaned.append(f"\n{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return cleaned

    def clean_whole_transcription(self, whole_transcription: Dict) -> List[str]:
        """Cleans the whole transcription output to a readable format (without diarization)."""
        cleaned =[]
        if whole_transcription and whole_transcription.get("segments"):
            for segment in whole_transcription["segments"]:
                cleaned.append(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str], filename="transcription.txt"):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{os.path.splitext(os.path.basename(self.audio_file))[0]}_{filename}")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None):
        """Orchestrates the audio processing pipeline."""
        try:
            if self.skip_diarization:
                print("Skipping diarization and transcribing the whole audio file.")
                whole_transcription = self.transcribe_whole_audio(language=language)
                if whole_transcription:
                    cleaned_transcriptions = self.clean_whole_transcription(whole_transcription)
                    print("\nTranscription:")
                    for line in cleaned_transcriptions:
                        print(line)
                    self.save_transcription_to_file(cleaned_transcriptions, filename="whole_transcription.txt")
                else:
                    print("Transcription failed.")
            else:
                pipeline, diarization_result, speaker_names = self.run_diarization()
                chunk_info_list = self.chunk_audio(diarization_result)
                print("Audio file diarized and chunked.")

                speaker_names_path = os.path.join(self.output_dir, "speaker_names.json")
                with open(speaker_names_path, "w") as f:
                    json.dump(speaker_names, f, indent=4)
                print(f"Speaker names saved to {speaker_names_path}")

                # You can modify speaker_names here if needed
                speaker_names["SPEAKER_01"] = "Moderator"
                speaker_names["SPEAKER_02"] = "Participant 1"

                self.process_and_transcribe_chunks(chunk_info_list, language=language)

                transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
                cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
                print("\nCleaned Transcriptions:")
                for paragraph in cleaned_transcriptions:
                    print(paragraph)

                self.save_transcription_to_file(cleaned_transcriptions, filename="diarized_transcription.txt")

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

if __name__ == "__main__":
    audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
    output_dir = "chunks"

    # Load HF_TOKEN if config.py exists
    hf_token = None
    try:
        from config import HF_TOKEN as config_token
        hf_token = config_token
    except ImportError:
        logging.warning(
            "config.py not found or HF_TOKEN not defined. Diarization may not work."
            " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
        )

    # Ask the user if they want to skip diarization
    skip_diarization_input = input("Do you want to skip speaker diarization and transcribe the whole audio directly? (yes/no): ").lower()
    skip_diarization = skip_diarization_input == "yes"

    transcriber = AudioTranscriber(audio_file, output_dir, hf_token, skip_diarization=skip_diarization)

    # Ask the user if they want to specify a language
    specify_language = input("Do you want to specify a language for transcription? (yes/no): ").lower()
    transcription_language = None
    if specify_language == "yes":
        transcription_language = input("Enter the language code (e.g., en, fr, es), or leave blank for auto-detection: ").strip()
        if not transcription_language:
            transcription_language = None # Explicitly set to None if user leaves it blank
    elif specify_language == "no":
        print("Using automatic language detection.")
    else:
        print("Invalid input. Using automatic language detection.")

    transcriber.process_audio(language=transcription_language)

2025-03-23 19:59:20,944 - INFO - Using device: cpu


Using automatic language detection.


  std = sequences.std(dim=-1, correction=1)
2025-03-23 19:59:51,202 - INFO - Transcribing chunks\chunk_1.mp3


Audio file diarized and chunked.
Speaker names saved to chunks\speaker_names.json
Detected language: English


100%|██████████| 997/997 [00:03<00:00, 294.38frames/s]
2025-03-23 19:59:56,951 - INFO - Transcribing chunks\chunk_2.mp3


Detected language: English


100%|██████████| 140/140 [00:01<00:00, 122.74frames/s]
2025-03-23 20:00:00,084 - INFO - Transcribing chunks\chunk_3.mp3


Detected language: English


100%|██████████| 209/209 [00:01<00:00, 118.62frames/s]
2025-03-23 20:00:04,073 - INFO - Transcribing chunks\chunk_4.mp3


Detected language: English


100%|██████████| 869/869 [00:02<00:00, 332.94frames/s]
2025-03-23 20:00:09,485 - INFO - Transcribing chunks\chunk_5.mp3


Detected language: English


100%|██████████| 178/178 [00:01<00:00, 139.06frames/s]
2025-03-23 20:00:12,930 - INFO - Transcribing chunks\chunk_6.mp3


Detected language: English


100%|██████████| 143/143 [00:01<00:00, 118.85frames/s]


Cleaned Transcriptions:

Speaker 1 [0.03 - 10.00]:  Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?

Moderator [11.42 - 12.82]:  and correction and embarrassment.

Speaker 1 [12.82 - 14.91]:  Do indeed round one. Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.

Moderator [25.58 - 27.37]:  I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:  But what I want to know?





In [1]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None, skip_diarization: bool = False):
        self.audio_file = audio_file
        self.output_dir = output_dir
        self.hf_token = hf_token
        self.whisper_model = "base"
        self.debug_mode = False
        self.skip_diarization = skip_diarization
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks = []
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None) -> Dict:
        """Transcribes an audio chunk."""
        try:
            model = whisper.load_model(self.whisper_model)
            audio = whisper.load_audio(audio_file_path)
            return whisper.transcribe(model, audio, language=language)
        except Exception as e:
            logging.error(f"Transcription error: {e}")
            return {}

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None) -> List[Dict]:
        """Processes and saves transcriptions for individual chunks."""
        transcriptions = []
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            transcription = self.transcribe_chunk(chunk["file_path"], language=language)
            if transcription:
                transcriptions.append({**chunk, "transcription": transcription})
                if self.debug_mode: # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']}:")
                    for segment in transcription["segments"]:
                        print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions

    def transcribe_whole_audio(self, language: Optional[str] = None) -> Dict:
        """Transcribes the entire audio file without diarization."""
        logging.info(f"Transcribing the entire audio file: {self.audio_file}")
        return self.transcribe_chunk(self.audio_file, language=language)

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format (for diarized audio)."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned = []
        current_speaker, current_text, current_start, current_end = None, "", None, None
        for i, chunk in enumerate(transcriptions):
            if current_speaker != chunk["speaker"]:
                if current_speaker is not None:
                    cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                    if i < len(transcriptions) and transcriptions[i]["speaker"] != transcriptions[i-1]["speaker"]:
                        pass # Avoid adding extra blank line here, handle it before the next speaker
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
                if i > 0 and transcriptions[i]["speaker"] != transcriptions[i-1]["speaker"]:
                    cleaned.append("") # Add a blank line before a new speaker (after the first)
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return [line for line in cleaned if line.strip() != ""]

    def clean_whole_transcription(self, whole_transcription: Dict) -> List[str]:
        """Cleans the whole transcription output to a readable format (without diarization)."""
        cleaned = []
        if whole_transcription and whole_transcription.get("segments"):
            for segment in whole_transcription["segments"]:
                cleaned.append(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str], filename="transcription.txt"):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{os.path.splitext(os.path.basename(self.audio_file))[0]}_{filename}")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None):
        """Orchestrates the audio processing pipeline."""
        try:
            if self.skip_diarization:
                print("Skipping diarization and transcribing the whole audio file.")
                whole_transcription = self.transcribe_whole_audio(language=language)
                if whole_transcription:
                    cleaned_transcriptions = self.clean_whole_transcription(whole_transcription)
                    print("\nTranscription:")
                    for line in cleaned_transcriptions:
                        print(line)
                    self.save_transcription_to_file(cleaned_transcriptions, filename="whole_transcription.txt")
                else:
                    print("Transcription failed.")
            else:
                pipeline, diarization_result, speaker_names = self.run_diarization()
                chunk_info_list = self.chunk_audio(diarization_result)
                print("Audio file diarized and chunked.")

                speaker_names_path = os.path.join(self.output_dir, "speaker_names.json")
                with open(speaker_names_path, "w") as f:
                    json.dump(speaker_names, f, indent=4)
                print(f"Speaker names saved to {speaker_names_path}")

                # You can modify speaker_names here if needed
                speaker_names["SPEAKER_01"] = "Moderator"
                speaker_names["SPEAKER_02"] = "Participant 1"

                self.process_and_transcribe_chunks(chunk_info_list, language=language)

                transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
                cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
                print("Cleaned Transcriptions:")
                for paragraph in cleaned_transcriptions:
                    print(paragraph)

                self.save_transcription_to_file(cleaned_transcriptions, filename="diarized_transcription.txt")

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

if __name__ == "__main__":
    audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
    output_dir = "chunks"

    # Load HF_TOKEN if config.py exists
    hf_token = None
    try:
        from config import HF_TOKEN as config_token
        hf_token = config_token
    except ImportError:
        logging.warning(
            "config.py not found or HF_TOKEN not defined. Diarization may not work."
            " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
        )

    # Ask the user if they want to skip diarization
    skip_diarization_input = input("Do you want to skip speaker diarization and transcribe the whole audio directly? (yes/no): ").lower()
    skip_diarization = skip_diarization_input == "yes"

    transcriber = AudioTranscriber(audio_file, output_dir, hf_token, skip_diarization=skip_diarization)

    # Ask the user if they want to specify a language
    specify_language = input("Do you want to specify a language for transcription? (yes/no): ").lower()
    transcription_language = None
    if specify_language == "yes":
        transcription_language = input("Enter the language code (e.g., en, fr, es), or leave blank for auto-detection: ").strip()
        if not transcription_language:
            transcription_language = None # Explicitly set to None if user leaves it blank
    elif specify_language == "no":
        print("Using automatic language detection.")
    else:
        print("Invalid input. Using automatic language detection.")

    transcriber.process_audio(language=transcription_language)

  from .autonotebook import tqdm as notebook_tqdm


Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.



2025-03-23 21:16:07,674 - INFO - Using device: cpu


Using automatic language detection.


2025-03-23 21:16:09,511 - INFO - Applied quirks (see `speechbrain.utils.quirks`): [allow_tf32, disable_jit_profiling]
2025-03-23 21:16:09,511 - INFO - Excluded quirks specified by the `SB_DISABLE_QUIRKS` environment (comma-separated list): []
  if ismodule(module) and hasattr(module, '__file__'):
  std = sequences.std(dim=-1, correction=1)
2025-03-23 21:16:35,124 - INFO - Transcribing chunks\chunk_1.mp3


Audio file diarized and chunked.
Speaker names saved to chunks\speaker_names.json
Detected language: English


100%|██████████| 997/997 [00:03<00:00, 284.36frames/s]
2025-03-23 21:16:41,114 - INFO - Transcribing chunks\chunk_2.mp3


Detected language: English


100%|██████████| 140/140 [00:02<00:00, 53.29frames/s]
2025-03-23 21:16:46,131 - INFO - Transcribing chunks\chunk_3.mp3


Detected language: English


100%|██████████| 209/209 [00:01<00:00, 190.17frames/s]
2025-03-23 21:16:49,268 - INFO - Transcribing chunks\chunk_4.mp3


Detected language: English


100%|██████████| 869/869 [00:01<00:00, 434.81frames/s]
2025-03-23 21:16:53,477 - INFO - Transcribing chunks\chunk_5.mp3


Detected language: English


100%|██████████| 178/178 [00:01<00:00, 161.09frames/s]
2025-03-23 21:16:56,608 - INFO - Transcribing chunks\chunk_6.mp3


Detected language: English


100%|██████████| 143/143 [00:01<00:00, 120.87frames/s]

Cleaned Transcriptions:
Speaker 1 [0.03 - 10.00]:  Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?
Moderator [11.42 - 12.82]:  and correction and embarrassment.
Speaker 1 [12.82 - 14.91]:  Do indeed round one. Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.
Moderator [25.58 - 27.37]:  I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:  But what I want to know?





In [1]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None, skip_diarization: bool = False, whisper_model: str = "base"):
        self.audio_file = audio_file
        self.output_dir = output_dir
        self.hf_token = hf_token
        self.whisper_model = whisper_model
        self.debug_mode = False
        self.skip_diarization = skip_diarization
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks = []
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None) -> Dict:
        """Transcribes an audio chunk."""
        try:
            model = whisper.load_model(self.whisper_model)
            audio = whisper.load_audio(audio_file_path)
            return whisper.transcribe(model, audio, language=language)
        except Exception as e:
            logging.error(f"Transcription error: {e}")
            return {}

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None) -> List[Dict]:
        """Processes and saves transcriptions for individual chunks."""
        transcriptions = []
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            transcription = self.transcribe_chunk(chunk["file_path"], language=language)
            if transcription:
                transcriptions.append({**chunk, "transcription": transcription})
                if self.debug_mode: # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']}:")
                    for segment in transcription["segments"]:
                        print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions

    def transcribe_whole_audio(self, language: Optional[str] = None) -> Dict:
        """Transcribes the entire audio file without diarization."""
        logging.info(f"Transcribing the entire audio file: {self.audio_file}")
        return self.transcribe_chunk(self.audio_file, language=language)

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format (for diarized audio)."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned = []
        current_speaker, current_text, current_start, current_end = None, "", None, None
        for i, chunk in enumerate(transcriptions):
            if current_speaker != chunk["speaker"]:
                if current_speaker is not None:
                    cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
                if i > 0 and transcriptions[i]["speaker"] != transcriptions[i-1]["speaker"]:
                    cleaned.append("") # Add a blank line before a new speaker (after the first)
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return [line for line in cleaned if line.strip() != ""]

    def clean_whole_transcription(self, whole_transcription: Dict) -> List[str]:
        """Cleans the whole transcription output to a readable format (without diarization)."""
        cleaned = []
        if whole_transcription and whole_transcription.get("segments"):
            for segment in whole_transcription["segments"]:
                cleaned.append(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str], filename="transcription.txt"):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{os.path.splitext(os.path.basename(self.audio_file))[0]}_{filename}")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None):
        """Orchestrates the audio processing pipeline."""
        try:
            if self.skip_diarization:
                print("Skipping diarization and transcribing the whole audio file.")
                whole_transcription = self.transcribe_whole_audio(language=language)
                if whole_transcription:
                    cleaned_transcriptions = self.clean_whole_transcription(whole_transcription)
                    print("\nTranscription:")
                    for line in cleaned_transcriptions:
                        print(line)
                    self.save_transcription_to_file(cleaned_transcriptions, filename="whole_transcription.txt")
                else:
                    print("Transcription failed.")
            else:
                pipeline, diarization_result, speaker_names = self.run_diarization(max_speakers=max_speakers, min_speakers=min_speakers)
                chunk_info_list = self.chunk_audio(diarization_result)
                print("Audio file diarized and chunked.")

                speaker_names_path = os.path.join(self.output_dir, "speaker_names.json")
                with open(speaker_names_path, "w") as f:
                    json.dump(speaker_names, f, indent=4)
                print(f"Speaker names saved to {speaker_names_path}")

                # You can modify speaker_names here if needed
                speaker_names["SPEAKER_01"] = "Moderator"
                speaker_names["SPEAKER_02"] = "Participant 1"

                self.process_and_transcribe_chunks(chunk_info_list, language=language)

                transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
                cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
                print("Cleaned Transcriptions:")
                for paragraph in cleaned_transcriptions:
                    print(paragraph)

                self.save_transcription_to_file(cleaned_transcriptions, filename="diarized_transcription.txt")

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

if __name__ == "__main__":
    audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
    output_dir = "chunks"

    # Load HF_TOKEN if config.py exists
    hf_token = None
    try:
        from config import HF_TOKEN as config_token
        hf_token = config_token
    except ImportError:
        logging.warning(
            "config.py not found or HF_TOKEN not defined. Diarization may not work."
            " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
        )

    # Ask the user if they want to skip diarization
    skip_diarization_input = input("Do you want to skip speaker diarization and transcribe the whole audio directly? (yes/no): ").lower()
    skip_diarization = skip_diarization_input == "yes"

    whisper_model_choice = "base"
    if not skip_diarization:
        # Ask for min/max speakers
        specify_speakers = input("Do you want to specify the minimum and maximum number of speakers? (yes/no): ").lower()
        min_speakers = None
        max_speakers = None
        if specify_speakers == "yes":
            try:
                min_speakers = int(input("Enter the minimum number of speakers (optional, leave blank for auto): ") or None)
                max_speakers = int(input("Enter the maximum number of speakers (optional, leave blank for auto): ") or None)
            except ValueError:
                print("Invalid input for the number of speakers. Using default settings.")

        # Ask for Whisper model
        print("\nAvailable Whisper models: tiny, base, small, medium, large")
        chosen_model = input("Choose a Whisper model for transcription (default: base): ").lower().strip()
        if chosen_model in ["tiny", "base", "small", "medium", "large"]:
            whisper_model_choice = chosen_model
        elif chosen_model:
            print(f"Invalid Whisper model '{chosen_model}'. Using default model 'base'.")
        else:
            print("Using default Whisper model 'base'.")
    else:
        # Ask for Whisper model even if diarization is skipped
        print("\nAvailable Whisper models: tiny, base, small, medium, large")
        chosen_model = input("Choose a Whisper model for transcription (default: base): ").lower().strip()
        if chosen_model in ["tiny", "base", "small", "medium", "large"]:
            whisper_model_choice = chosen_model
        elif chosen_model:
            print(f"Invalid Whisper model '{chosen_model}'. Using default model 'base'.")
        else:
            print("Using default Whisper model 'base'.")

    transcriber = AudioTranscriber(audio_file, output_dir, hf_token, skip_diarization=skip_diarization, whisper_model=whisper_model_choice)

    # Ask the user if they want to specify a language
    specify_language = input("Do you want to specify a language for transcription? (yes/no): ").lower()
    transcription_language = None
    if specify_language == "yes":
        transcription_language = input("Enter the language code (e.g., en, fr, es), or leave blank for auto-detection: ").strip()
        if not transcription_language:
            transcription_language = None # Explicitly set to None if user leaves it blank
    elif specify_language == "no":
        print("Using automatic language detection.")
    else:
        print("Invalid input. Using automatic language detection.")

    if not skip_diarization:
        transcriber.process_audio(language=transcription_language, min_speakers=min_speakers, max_speakers=max_speakers)
    else:
        transcriber.process_audio(language=transcription_language)

  from .autonotebook import tqdm as notebook_tqdm


Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.


Available Whisper models: tiny, base, small, medium, large


2025-03-23 21:26:08,398 - INFO - Using device: cpu


Using automatic language detection.


2025-03-23 21:26:10,170 - INFO - Applied quirks (see `speechbrain.utils.quirks`): [disable_jit_profiling, allow_tf32]
2025-03-23 21:26:10,171 - INFO - Excluded quirks specified by the `SB_DISABLE_QUIRKS` environment (comma-separated list): []
  if ismodule(module) and hasattr(module, '__file__'):
  std = sequences.std(dim=-1, correction=1)
2025-03-23 21:26:36,673 - INFO - Transcribing chunks\chunk_1.mp3


Audio file diarized and chunked.
Speaker names saved to chunks\speaker_names.json
Detected language: English


100%|██████████| 997/997 [00:03<00:00, 259.89frames/s]
2025-03-23 21:26:43,126 - INFO - Transcribing chunks\chunk_2.mp3


Detected language: English


100%|██████████| 140/140 [00:01<00:00, 135.97frames/s]
2025-03-23 21:26:46,279 - INFO - Transcribing chunks\chunk_3.mp3


Detected language: English


100%|██████████| 209/209 [00:01<00:00, 153.39frames/s]
2025-03-23 21:26:49,841 - INFO - Transcribing chunks\chunk_4.mp3


Detected language: English


100%|██████████| 869/869 [00:01<00:00, 439.31frames/s]
2025-03-23 21:26:54,027 - INFO - Transcribing chunks\chunk_5.mp3


Detected language: English


100%|██████████| 178/178 [00:01<00:00, 139.01frames/s]
2025-03-23 21:26:57,494 - INFO - Transcribing chunks\chunk_6.mp3


Detected language: English


100%|██████████| 143/143 [00:01<00:00, 118.67frames/s]

Cleaned Transcriptions:
Speaker 1 [0.03 - 10.00]:  Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?
Moderator [11.42 - 12.82]:  and correction and embarrassment.
Speaker 1 [12.82 - 14.91]:  Do indeed round one. Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.
Moderator [25.58 - 27.37]:  I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:  But what I want to know?





In [1]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None, skip_diarization: bool = False, whisper_model: str = "base"):
        self.audio_file = audio_file
        self.output_dir = output_dir
        self.hf_token = hf_token
        self.whisper_model = whisper_model
        self.debug_mode = False
        self.skip_diarization = skip_diarization
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks = []
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None) -> Dict:
        """Transcribes an audio chunk."""
        try:
            model = whisper.load_model(self.whisper_model)
            audio = whisper.load_audio(audio_file_path)
            return whisper.transcribe(model, audio, language=language)
        except Exception as e:
            logging.error(f"Transcription error: {e}")
            return {}

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None) -> List[Dict]:
        """Processes and saves transcriptions for individual chunks."""
        transcriptions = []
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            transcription = self.transcribe_chunk(chunk["file_path"], language=language)
            if transcription:
                transcriptions.append({**chunk, "transcription": transcription})
                if self.debug_mode: # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']}:")
                    for segment in transcription["segments"]:
                        print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions

    def transcribe_whole_audio(self, language: Optional[str] = None) -> Dict:
        """Transcribes the entire audio file without diarization."""
        logging.info(f"Transcribing the entire audio file: {self.audio_file}")
        return self.transcribe_chunk(self.audio_file, language=language)

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format (for diarized audio)."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned = []
        current_speaker, current_text, current_start, current_end = None, "", None, None
        for i, chunk in enumerate(transcriptions):
            if current_speaker != chunk["speaker"]:
                if current_speaker is not None:
                    cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
                if i > 0 and transcriptions[i]["speaker"] != transcriptions[i-1]["speaker"]:
                    cleaned.append("") # Add a blank line before a new speaker (after the first)
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return [line for line in cleaned if line.strip() != ""]

    def clean_whole_transcription(self, whole_transcription: Dict) -> List[str]:
        """Cleans the whole transcription output to a readable format (without diarization)."""
        cleaned = []
        if whole_transcription and whole_transcription.get("segments"):
            for segment in whole_transcription["segments"]:
                cleaned.append(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str], filename="transcription.txt"):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{os.path.splitext(os.path.basename(self.audio_file))[0]}_{filename}")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None):
        """Orchestrates the audio processing pipeline."""
        try:
            if self.skip_diarization:
                print("Skipping diarization and transcribing the whole audio file.")
                whole_transcription = self.transcribe_whole_audio(language=language)
                if whole_transcription:
                    cleaned_transcriptions = self.clean_whole_transcription(whole_transcription)
                    print("\nTranscription:")
                    for line in cleaned_transcriptions:
                        print(line)
                    self.save_transcription_to_file(cleaned_transcriptions, filename="whole_transcription.txt")
                else:
                    print("Transcription failed.")
            else:
                pipeline, diarization_result, speaker_names = self.run_diarization(max_speakers=max_speakers, min_speakers=min_speakers)
                chunk_info_list = self.chunk_audio(diarization_result)
                print("Audio file diarized and chunked.")

                num_speakers = len(speaker_names)
                print(f"\nDetected {num_speakers} speakers:")
                for i, (turn, _, speaker) in enumerate(diarization_result.itertracks(yield_label=True), 1):
                    print(f"Chunk {i}: Speaker '{speaker}' [{turn.start:.2f} - {turn.end:.2f}]")

                rename_speakers = input("\nDo you want to rename the speakers? (yes/no): ").lower()
                if rename_speakers == "yes":
                    new_speaker_names = {}
                    for label in sorted(speaker_names.keys()):
                        new_name = input(f"Enter a new name for '{speaker_names[label]}' (default: {speaker_names[label]}): ").strip()
                        if new_name:
                            new_speaker_names[label] = new_name
                        else:
                            new_speaker_names[label] = speaker_names[label]
                    speaker_names.update(new_speaker_names)

                speaker_names_path = os.path.join(self.output_dir, "speaker_names.json")
                with open(speaker_names_path, "w") as f:
                    json.dump(speaker_names, f, indent=4)
                print(f"Speaker names saved to {speaker_names_path}")

                self.process_and_transcribe_chunks(chunk_info_list, language=language)

                transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
                cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
                print("Cleaned Transcriptions:")
                for paragraph in cleaned_transcriptions:
                    print(paragraph)

                self.save_transcription_to_file(cleaned_transcriptions, filename="diarized_transcription.txt")

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

if __name__ == "__main__":
    audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
    output_dir = "chunks"

    # Load HF_TOKEN if config.py exists
    hf_token = None
    try:
        from config import HF_TOKEN as config_token
        hf_token = config_token
    except ImportError:
        logging.warning(
            "config.py not found or HF_TOKEN not defined. Diarization may not work."
            " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
        )

    # Ask the user if they want to skip diarization
    skip_diarization_input = input("Do you want to skip speaker diarization and transcribe the whole audio directly? (yes/no): ").lower()
    skip_diarization = skip_diarization_input == "yes"

    whisper_model_choice = "base"
    if not skip_diarization:
        # Ask for min/max speakers
        specify_speakers = input("Do you want to specify the minimum and maximum number of speakers? (yes/no): ").lower()
        min_speakers = None
        max_speakers = None
        if specify_speakers == "yes":
            try:
                min_speakers = int(input("Enter the minimum number of speakers (optional, leave blank for auto): ") or None)
                max_speakers = int(input("Enter the maximum number of speakers (optional, leave blank for auto): ") or None)
            except ValueError:
                print("Invalid input for the number of speakers. Using default settings.")

        # Ask for Whisper model
        print("\nAvailable Whisper models: tiny, base, small, medium, large")
        chosen_model = input("Choose a Whisper model for transcription (default: base): ").lower().strip()
        if chosen_model in ["tiny", "base", "small", "medium", "large"]:
            whisper_model_choice = chosen_model
        elif chosen_model:
            print(f"Invalid Whisper model '{chosen_model}'. Using default model 'base'.")
        else:
            print("Using default Whisper model 'base'.")
    else:
        # Ask for Whisper model even if diarization is skipped
        print("\nAvailable Whisper models: tiny, base, small, medium, large")
        chosen_model = input("Choose a Whisper model for transcription (default: base): ").lower().strip()
        if chosen_model in ["tiny", "base", "small", "medium", "large"]:
            whisper_model_choice = chosen_model
        elif chosen_model:
            print(f"Invalid Whisper model '{chosen_model}'. Using default model 'base'.")
        else:
            print("Using default Whisper model 'base'.")

    transcriber = AudioTranscriber(audio_file, output_dir, hf_token, skip_diarization=skip_diarization, whisper_model=whisper_model_choice)

    # Ask the user if they want to specify a language
    specify_language = input("Do you want to specify a language for transcription? (yes/no): ").lower()
    transcription_language = None
    if specify_language == "yes":
        transcription_language = input("Enter the language code (e.g., en, fr, es), or leave blank for auto-detection: ").strip()
        if not transcription_language:
            transcription_language = None # Explicitly set to None if user leaves it blank
    elif specify_language == "no":
        print("Using automatic language detection.")
    else:
        print("Invalid input. Using automatic language detection.")

    if not skip_diarization:
        transcriber.process_audio(language=transcription_language, min_speakers=min_speakers, max_speakers=max_speakers)
    else:
        transcriber.process_audio(language=transcription_language)

  from .autonotebook import tqdm as notebook_tqdm


Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.


Available Whisper models: tiny, base, small, medium, large


2025-03-23 21:31:44,202 - INFO - Using device: cpu


Using automatic language detection.


2025-03-23 21:31:46,218 - INFO - Applied quirks (see `speechbrain.utils.quirks`): [disable_jit_profiling, allow_tf32]
2025-03-23 21:31:46,218 - INFO - Excluded quirks specified by the `SB_DISABLE_QUIRKS` environment (comma-separated list): []
  if ismodule(module) and hasattr(module, '__file__'):
  std = sequences.std(dim=-1, correction=1)


Audio file diarized and chunked.

Detected 2 speakers:
Chunk 1: Speaker 'SPEAKER_00' [0.03 - 10.00]
Chunk 2: Speaker 'SPEAKER_01' [11.42 - 12.82]
Chunk 3: Speaker 'SPEAKER_00' [12.82 - 14.91]
Chunk 4: Speaker 'SPEAKER_00' [16.94 - 25.63]
Chunk 5: Speaker 'SPEAKER_01' [25.58 - 27.37]
Chunk 6: Speaker 'SPEAKER_00' [27.37 - 28.97]


2025-03-23 21:32:20,340 - INFO - Transcribing chunks\chunk_1.mp3


Speaker names saved to chunks\speaker_names.json
Detected language: English


100%|██████████| 997/997 [00:03<00:00, 277.55frames/s]
2025-03-23 21:32:26,586 - INFO - Transcribing chunks\chunk_2.mp3


Detected language: English


100%|██████████| 140/140 [00:02<00:00, 48.70frames/s]
2025-03-23 21:32:31,767 - INFO - Transcribing chunks\chunk_3.mp3


Detected language: English


100%|██████████| 209/209 [00:01<00:00, 169.77frames/s]
2025-03-23 21:32:35,941 - INFO - Transcribing chunks\chunk_4.mp3


Detected language: English


100%|██████████| 869/869 [00:01<00:00, 459.99frames/s]
2025-03-23 21:32:40,429 - INFO - Transcribing chunks\chunk_5.mp3


Detected language: English


100%|██████████| 178/178 [00:01<00:00, 116.41frames/s]
2025-03-23 21:32:44,045 - INFO - Transcribing chunks\chunk_6.mp3


Detected language: English


100%|██████████| 143/143 [00:01<00:00, 110.94frames/s]

Cleaned Transcriptions:
Speaker 1 [0.03 - 10.00]:  Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?
Speaker 2 [11.42 - 12.82]:  and correction and embarrassment.
Speaker 1 [12.82 - 14.91]:  Do indeed round one. Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.
Speaker 2 [25.58 - 27.37]:  I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:  But what I want to know?





In [1]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None, skip_diarization: bool = False, whisper_model: str = "base"):
        self.audio_file = audio_file
        self.output_dir = output_dir
        self.hf_token = hf_token
        self.whisper_model = whisper_model
        self.debug_mode = False
        self.skip_diarization = skip_diarization
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks = []
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None) -> Tuple[Dict, Optional[str]]:
        """Transcribes an audio chunk and returns the transcription and detected language."""
        try:
            model = whisper.load_model(self.whisper_model)
            audio = whisper.load_audio(audio_file_path)
            result = whisper.transcribe(model, audio, language=language)
            detected_language = result.get("language") if language is None else language
            return result, detected_language
        except Exception as e:
            logging.error(f"Transcription error: {e}")
            return {}, None

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None) -> List[Dict]:
        """Processes and saves transcriptions for individual chunks."""
        transcriptions = []
        detected_language = None
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            transcription, lang = self.transcribe_chunk(chunk["file_path"], language=language)
            if transcription:
                if detected_language is None and lang is not None:
                    detected_language = lang
                transcriptions.append({**chunk, "transcription": transcription, "language": lang})
                if self.debug_mode: # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']} (Language: {lang}):")
                    for segment in transcription["segments"]:
                        print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions, detected_language

    def transcribe_whole_audio(self, language: Optional[str] = None) -> Tuple[Dict, Optional[str]]:
        """Transcribes the entire audio file without diarization."""
        logging.info(f"Transcribing the entire audio file: {self.audio_file}")
        return self.transcribe_chunk(self.audio_file, language=language)

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format (for diarized audio)."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned = []
        detected_language = transcriptions[0].get("language") if transcriptions else None
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")

        current_speaker, current_text, current_start, current_end = None, "", None, None
        for i, chunk in enumerate(transcriptions):
            if current_speaker != chunk["speaker"]:
                if current_speaker is not None:
                    cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
                if i > 0 and transcriptions[i]["speaker"] != transcriptions[i-1]["speaker"]:
                    cleaned.append("") # Add a blank line before a new speaker (after the first)
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return [line for line in cleaned if line.strip() != ""]

    def clean_whole_transcription(self, whole_transcription: Dict, language: Optional[str] = None) -> List[str]:
        """Cleans the whole transcription output to a readable format (without diarization)."""
        cleaned = []
        detected_language = whole_transcription.get("language") if whole_transcription else language
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        if whole_transcription and whole_transcription.get("segments"):
            for segment in whole_transcription["segments"]:
                cleaned.append(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str], filename="transcription.txt"):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{os.path.splitext(os.path.basename(self.audio_file))[0]}_{filename}")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None):
        """Orchestrates the audio processing pipeline."""
        try:
            if self.skip_diarization:
                print("Skipping diarization and transcribing the whole audio file.")
                whole_transcription, detected_language = self.transcribe_whole_audio(language=language)
                if whole_transcription:
                    cleaned_transcriptions = self.clean_whole_transcription(whole_transcription, detected_language)
                    print("\nTranscription:")
                    for line in cleaned_transcriptions:
                        print(line)
                    self.save_transcription_to_file(cleaned_transcriptions, filename="whole_transcription.txt")
                else:
                    print("Transcription failed.")
            else:
                pipeline, diarization_result, speaker_names = self.run_diarization(max_speakers=max_speakers, min_speakers=min_speakers)
                chunk_info_list = self.chunk_audio(diarization_result)
                print("Audio file diarized and chunked.")

                num_speakers = len(speaker_names)
                print(f"\nDetected {num_speakers} speakers:")
                for i, (turn, _, speaker) in enumerate(diarization_result.itertracks(yield_label=True), 1):
                    print(f"Chunk {i}: Speaker '{speaker}' [{turn.start:.2f} - {turn.end:.2f}]")

                rename_speakers = input("\nDo you want to rename the speakers? (yes/no): ").lower()
                if rename_speakers == "yes":
                    new_speaker_names = {}
                    for label in sorted(speaker_names.keys()):
                        new_name = input(f"Enter a new name for '{speaker_names[label]}' (default: {speaker_names[label]}): ").strip()
                        if new_name:
                            new_speaker_names[label] = new_name
                        else:
                            new_speaker_names[label] = speaker_names[label]
                    speaker_names.update(new_speaker_names)

                speaker_names_path = os.path.join(self.output_dir, "speaker_names.json")
                with open(speaker_names_path, "w") as f:
                    json.dump(speaker_names, f, indent=4)
                print(f"Speaker names saved to {speaker_names_path}")

                transcriptions, detected_language = self.process_and_transcribe_chunks(chunk_info_list, language=language)

                transcriptions_json_path = os.path.join(self.output_dir, "transcriptions.json")
                cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
                print("Cleaned Transcriptions:")
                for paragraph in cleaned_transcriptions:
                    print(paragraph)

                self.save_transcription_to_file(cleaned_transcriptions, filename="diarized_transcription.txt")

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

if __name__ == "__main__":
    audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
    output_dir = "chunks"

    # Load HF_TOKEN if config.py exists
    hf_token = None
    try:
        from config import HF_TOKEN as config_token
        hf_token = config_token
    except ImportError:
        logging.warning(
            "config.py not found or HF_TOKEN not defined. Diarization may not work."
            " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
        )

    # Ask the user if they want to skip diarization
    skip_diarization_input = input("Do you want to skip speaker diarization and transcribe the whole audio directly? (yes/no): ").lower()
    skip_diarization = skip_diarization_input == "yes"

    whisper_model_choice = "base"
    if not skip_diarization:
        # Ask for min/max speakers
        specify_speakers = input("Do you want to specify the minimum and maximum number of speakers? (yes/no): ").lower()
        min_speakers = None
        max_speakers = None
        if specify_speakers == "yes":
            try:
                min_speakers = int(input("Enter the minimum number of speakers (optional, leave blank for auto): ") or None)
                max_speakers = int(input("Enter the maximum number of speakers (optional, leave blank for auto): ") or None)
            except ValueError:
                print("Invalid input for the number of speakers. Using default settings.")

        # Ask for Whisper model
        print("\nAvailable Whisper models: tiny, base, small, medium, large")
        chosen_model = input("Choose a Whisper model for transcription (default: base): ").lower().strip()
        if chosen_model in ["tiny", "base", "small", "medium", "large"]:
            whisper_model_choice = chosen_model
        elif chosen_model:
            print(f"Invalid Whisper model '{chosen_model}'. Using default model 'base'.")
        else:
            print("Using default Whisper model 'base'.")
    else:
        # Ask for Whisper model even if diarization is skipped
        print("\nAvailable Whisper models: tiny, base, small, medium, large")
        chosen_model = input("Choose a Whisper model for transcription (default: base): ").lower().strip()
        if chosen_model in ["tiny", "base", "small", "medium", "large"]:
            whisper_model_choice = chosen_model
        elif chosen_model:
            print(f"Invalid Whisper model '{chosen_model}'. Using default model 'base'.")
        else:
            print("Using default Whisper model 'base'.")

    transcriber = AudioTranscriber(audio_file, output_dir, hf_token, skip_diarization=skip_diarization, whisper_model=whisper_model_choice)

    # Ask the user if they want to specify a language
    specify_language = input("Do you want to specify a language for transcription? (yes/no): ").lower()
    transcription_language = None
    if specify_language == "yes":
        transcription_language = input("Enter the language code (e.g., en, fr, es), or leave blank for auto-detection: ").strip()
        if not transcription_language:
            transcription_language = None # Explicitly set to None if user leaves it blank
    elif specify_language == "no":
        print("Using automatic language detection.")
    else:
        print("Invalid input. Using automatic language detection.")

    if not skip_diarization:
        transcriber.process_audio(language=transcription_language, min_speakers=min_speakers, max_speakers=max_speakers)
    else:
        transcriber.process_audio(language=transcription_language)

  from .autonotebook import tqdm as notebook_tqdm


Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.


Available Whisper models: tiny, base, small, medium, large


2025-03-23 21:38:18,232 - INFO - Using device: cpu


Using automatic language detection.


2025-03-23 21:38:20,309 - INFO - Applied quirks (see `speechbrain.utils.quirks`): [allow_tf32, disable_jit_profiling]
2025-03-23 21:38:20,311 - INFO - Excluded quirks specified by the `SB_DISABLE_QUIRKS` environment (comma-separated list): []
  if ismodule(module) and hasattr(module, '__file__'):
  std = sequences.std(dim=-1, correction=1)


Audio file diarized and chunked.

Detected 2 speakers:
Chunk 1: Speaker 'SPEAKER_00' [0.03 - 10.00]
Chunk 2: Speaker 'SPEAKER_01' [11.42 - 12.82]
Chunk 3: Speaker 'SPEAKER_00' [12.82 - 14.91]
Chunk 4: Speaker 'SPEAKER_00' [16.94 - 25.63]
Chunk 5: Speaker 'SPEAKER_01' [25.58 - 27.37]
Chunk 6: Speaker 'SPEAKER_00' [27.37 - 28.97]


2025-03-23 21:39:59,541 - INFO - Transcribing chunks\chunk_1.mp3


Speaker names saved to chunks\speaker_names.json
Detected language: English


100%|██████████| 997/997 [00:03<00:00, 277.17frames/s]
2025-03-23 21:40:06,096 - INFO - Transcribing chunks\chunk_2.mp3


Detected language: English


100%|██████████| 140/140 [00:01<00:00, 115.37frames/s]
2025-03-23 21:40:09,457 - INFO - Transcribing chunks\chunk_3.mp3


Detected language: English


100%|██████████| 209/209 [00:01<00:00, 184.38frames/s]
2025-03-23 21:40:13,201 - INFO - Transcribing chunks\chunk_4.mp3


Detected language: English


100%|██████████| 869/869 [00:02<00:00, 400.53frames/s]
2025-03-23 21:40:17,509 - INFO - Transcribing chunks\chunk_5.mp3


Detected language: English


100%|██████████| 178/178 [00:01<00:00, 148.95frames/s]
2025-03-23 21:40:20,942 - INFO - Transcribing chunks\chunk_6.mp3


Detected language: English


100%|██████████| 143/143 [00:01<00:00, 132.83frames/s]

Cleaned Transcriptions:
Detected Language: EN
Cicero [0.03 - 10.00]:  Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?
ragazza [11.42 - 12.82]:  and correction and embarrassment.
Cicero [12.82 - 14.91]:  Do indeed round one. Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.
ragazza [25.58 - 27.37]:  I'll earn you a few hundred million.
Cicero [27.37 - 28.97]:  But what I want to know?





In [1]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None, skip_diarization: bool = False, whisper_model: str = "base"):
        self.audio_file = audio_file
        self.audio_base_name = os.path.splitext(os.path.basename(self.audio_file))[0]
        self.output_dir_base = output_dir
        self.output_dir = os.path.join(self.output_dir_base, self.audio_base_name)
        self.hf_token = hf_token
        self.whisper_model = whisper_model
        self.debug_mode = False
        self.skip_diarization = skip_diarization
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks = []
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None) -> Tuple[Dict, Optional[str]]:
        """Transcribes an audio chunk and returns the transcription and detected language."""
        try:
            model = whisper.load_model(self.whisper_model)
            audio = whisper.load_audio(audio_file_path)
            result = whisper.transcribe(model, audio, language=language)
            detected_language = result.get("language") if language is None else language
            return result, detected_language
        except Exception as e:
            logging.error(f"Transcription error: {e}")
            return {}, None

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None) -> List[Dict]:
        """Processes and saves transcriptions for individual chunks."""
        transcriptions = []
        detected_language = None
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            transcription, lang = self.transcribe_chunk(chunk["file_path"], language=language)
            if transcription:
                if detected_language is None and lang is not None:
                    detected_language = lang
                transcriptions.append({**chunk, "transcription": transcription, "language": lang})
                if self.debug_mode: # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']} (Language: {lang}):")
                    for segment in transcription["segments"]:
                        print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, f"{self.audio_base_name}_transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions, detected_language

    def transcribe_whole_audio(self, language: Optional[str] = None) -> Tuple[Dict, Optional[str]]:
        """Transcribes the entire audio file without diarization."""
        logging.info(f"Transcribing the entire audio file: {self.audio_file}")
        return self.transcribe_chunk(self.audio_file, language=language)

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format (for diarized audio)."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned = []
        detected_language = transcriptions[0].get("language") if transcriptions else None
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        cleaned.append(f"Audio File: {self.audio_base_name}")
        cleaned.append("")

        current_speaker, current_text, current_start, current_end = None, "", None, None
        for i, chunk in enumerate(transcriptions):
            if current_speaker != chunk["speaker"]:
                if current_speaker is not None:
                    cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
                if i > 0 and transcriptions[i]["speaker"] != transcriptions[i-1]["speaker"]:
                    cleaned.append("") # Add a blank line before a new speaker (after the first)
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return [line for line in cleaned if line.strip() != ""]

    def clean_whole_transcription(self, whole_transcription: Dict, language: Optional[str] = None) -> List[str]:
        """Cleans the whole transcription output to a readable format (without diarization)."""
        cleaned = []
        detected_language = whole_transcription.get("language") if whole_transcription else language
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        cleaned.append(f"Audio File: {self.audio_base_name}")
        cleaned.append("")
        if whole_transcription and whole_transcription.get("segments"):
            for segment in whole_transcription["segments"]:
                cleaned.append(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str], filename="transcription.txt"):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{self.audio_base_name}_{filename}")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None):
        """Orchestrates the audio processing pipeline."""
        try:
            if self.skip_diarization:
                print("Skipping diarization and transcribing the whole audio file.")
                whole_transcription, detected_language = self.transcribe_whole_audio(language=language)
                if whole_transcription:
                    cleaned_transcriptions = self.clean_whole_transcription(whole_transcription, detected_language)
                    print("\nTranscription:")
                    for line in cleaned_transcriptions:
                        print(line)
                    self.save_transcription_to_file(cleaned_transcriptions, filename="whole_transcription.txt")
                else:
                    print("Transcription failed.")
            else:
                pipeline, diarization_result, speaker_names = self.run_diarization(max_speakers=max_speakers, min_speakers=min_speakers)
                chunk_info_list = self.chunk_audio(diarization_result)
                print("Audio file diarized and chunked.")

                num_speakers = len(speaker_names)
                print(f"\nDetected {num_speakers} speakers:")
                for i, (turn, _, speaker) in enumerate(diarization_result.itertracks(yield_label=True), 1):
                    print(f"Chunk {i}: Speaker '{speaker}' [{turn.start:.2f} - {turn.end:.2f}]")

                rename_speakers = input("\nDo you want to rename the speakers? (yes/no): ").lower()
                if rename_speakers == "yes":
                    new_speaker_names = {}
                    for label in sorted(speaker_names.keys()):
                        new_name = input(f"Enter a new name for '{speaker_names[label]}' (default: {speaker_names[label]}): ").strip()
                        if new_name:
                            new_speaker_names[label] = new_name
                        else:
                            new_speaker_names[label] = speaker_names[label]
                    speaker_names.update(new_speaker_names)

                speaker_names_path = os.path.join(self.output_dir, f"{self.audio_base_name}_speaker_names.json")
                with open(speaker_names_path, "w") as f:
                    json.dump(speaker_names, f, indent=4)
                print(f"Speaker names saved to {speaker_names_path}")

                transcriptions, detected_language = self.process_and_transcribe_chunks(chunk_info_list, language=language)

                transcriptions_json_path = os.path.join(self.output_dir, f"{self.audio_base_name}_transcriptions.json")
                cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
                print("Cleaned Transcriptions:")
                for paragraph in cleaned_transcriptions:
                    print(paragraph)

                self.save_transcription_to_file(cleaned_transcriptions, filename="diarized_transcription.txt")

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

if __name__ == "__main__":
    audio_file = r"C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3"
    output_dir = "out"

    # Load HF_TOKEN if config.py exists
    hf_token = None
    try:
        from config import HF_TOKEN as config_token
        hf_token = config_token
    except ImportError:
        logging.warning(
            "config.py not found or HF_TOKEN not defined. Diarization may not work."
            " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
        )

    # Ask the user if they want to skip diarization
    skip_diarization_input = input("Do you want to skip speaker diarization and transcribe the whole audio directly? (yes/no): ").lower()
    skip_diarization = skip_diarization_input == "yes"

    whisper_model_choice = "base"
    if not skip_diarization:
        # Ask for min/max speakers
        specify_speakers = input("Do you want to specify the minimum and maximum number of speakers? (yes/no): ").lower()
        min_speakers = None
        max_speakers = None
        if specify_speakers == "yes":
            try:
                min_speakers = int(input("Enter the minimum number of speakers (optional, leave blank for auto): ") or None)
                max_speakers = int(input("Enter the maximum number of speakers (optional, leave blank for auto): ") or None)
            except ValueError:
                print("Invalid input for the number of speakers. Using default settings.")

        # Ask for Whisper model
        print("\nAvailable Whisper models: tiny, base, small, medium, large")
        chosen_model = input("Choose a Whisper model for transcription (default: base): ").lower().strip()
        if chosen_model in ["tiny", "base", "small", "medium", "large"]:
            whisper_model_choice = chosen_model
        elif chosen_model:
            print(f"Invalid Whisper model '{chosen_model}'. Using default model 'base'.")
        else:
            print("Using default Whisper model 'base'.")
    else:
        # Ask for Whisper model even if diarization is skipped
        print("\nAvailable Whisper models: tiny, base, small, medium, large")
        chosen_model = input("Choose a Whisper model for transcription (default: base): ").lower().strip()
        if chosen_model in ["tiny", "base", "small", "medium", "large"]:
            whisper_model_choice = chosen_model
        elif chosen_model:
            print(f"Invalid Whisper model '{chosen_model}'. Using default model 'base'.")
        else:
            print("Using default Whisper model 'base'.")

    transcriber = AudioTranscriber(audio_file, output_dir, hf_token, skip_diarization=skip_diarization, whisper_model=whisper_model_choice)

    # Ask the user if they want to specify a language
    specify_language = input("Do you want to specify a language for transcription? (yes/no): ").lower()
    transcription_language = None
    if specify_language == "yes":
        transcription_language = input("Enter the language code (e.g., en, fr, es), or leave blank for auto-detection: ").strip()
        if not transcription_language:
            transcription_language = None # Explicitly set to None if user leaves it blank
    elif specify_language == "no":
        print("Using automatic language detection.")
    else:
        print("Invalid input. Using automatic language detection.")

    if not skip_diarization:
        transcriber.process_audio(language=transcription_language, min_speakers=min_speakers, max_speakers=max_speakers)
    else:
        transcriber.process_audio(language=transcription_language)

  from .autonotebook import tqdm as notebook_tqdm


Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.


Available Whisper models: tiny, base, small, medium, large
Using default Whisper model 'base'.


2025-03-23 21:47:19,480 - INFO - Using device: cpu


Using automatic language detection.


2025-03-23 21:47:21,419 - INFO - Applied quirks (see `speechbrain.utils.quirks`): [allow_tf32, disable_jit_profiling]
2025-03-23 21:47:21,421 - INFO - Excluded quirks specified by the `SB_DISABLE_QUIRKS` environment (comma-separated list): []
  if ismodule(module) and hasattr(module, '__file__'):
  std = sequences.std(dim=-1, correction=1)


Audio file diarized and chunked.

Detected 2 speakers:
Chunk 1: Speaker 'SPEAKER_00' [0.03 - 10.00]
Chunk 2: Speaker 'SPEAKER_01' [11.42 - 12.82]
Chunk 3: Speaker 'SPEAKER_00' [12.82 - 14.91]
Chunk 4: Speaker 'SPEAKER_00' [16.94 - 25.63]
Chunk 5: Speaker 'SPEAKER_01' [25.58 - 27.37]
Chunk 6: Speaker 'SPEAKER_00' [27.37 - 28.97]


2025-03-23 21:47:56,473 - INFO - Transcribing out\test\chunk_1.mp3


Speaker names saved to out\test\test_speaker_names.json
Detected language: English


100%|██████████| 997/997 [00:04<00:00, 233.92frames/s]
2025-03-23 21:48:04,421 - INFO - Transcribing out\test\chunk_2.mp3


Detected language: English


100%|██████████| 140/140 [00:01<00:00, 124.37frames/s]
2025-03-23 21:48:07,627 - INFO - Transcribing out\test\chunk_3.mp3


Detected language: English


100%|██████████| 209/209 [00:03<00:00, 64.33frames/s]
2025-03-23 21:48:12,939 - INFO - Transcribing out\test\chunk_4.mp3


Detected language: English


100%|██████████| 869/869 [00:02<00:00, 413.80frames/s]
2025-03-23 21:48:17,297 - INFO - Transcribing out\test\chunk_5.mp3


Detected language: English


100%|██████████| 178/178 [00:01<00:00, 145.94frames/s]
2025-03-23 21:48:20,690 - INFO - Transcribing out\test\chunk_6.mp3


Detected language: English


100%|██████████| 143/143 [00:01<00:00, 102.43frames/s]

Cleaned Transcriptions:
Detected Language: EN
Audio File: test
Speaker 1 [0.03 - 10.00]:  Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?
Speaker 2 [11.42 - 12.82]:  and correction and embarrassment.
Speaker 1 [12.82 - 14.91]:  Do indeed round one. Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.
Speaker 2 [25.58 - 27.37]:  I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:  But what I want to know?





In [2]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None, skip_diarization: bool = False, whisper_model: str = "base"):
        self.audio_file = audio_file
        self.audio_base_name = os.path.splitext(os.path.basename(self.audio_file))[0]
        self.output_dir_base = output_dir
        self.output_dir = os.path.join(self.output_dir_base, self.audio_base_name)
        self.hf_token = hf_token
        self.whisper_model = whisper_model
        self.debug_mode = False
        self.skip_diarization = skip_diarization
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks = []
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None) -> Tuple[Dict, Optional[str]]:
        """Transcribes an audio chunk and returns the transcription and detected language."""
        try:
            model = whisper.load_model(self.whisper_model)
            audio = whisper.load_audio(audio_file_path)
            result = whisper.transcribe(model, audio, language=language)
            detected_language = result.get("language") if language is None else language
            return result, detected_language
        except Exception as e:
            logging.error(f"Transcription error: {e}")
            return {}, None

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None) -> List[Dict]:
        """Processes and saves transcriptions for individual chunks."""
        transcriptions = []
        detected_language = None
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            transcription, lang = self.transcribe_chunk(chunk["file_path"], language=language)
            if transcription:
                if detected_language is None and lang is not None:
                    detected_language = lang
                transcriptions.append({**chunk, "transcription": transcription, "language": lang})
                if self.debug_mode: # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']} (Language: {lang}):")
                    for segment in transcription["segments"]:
                        print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, f"{self.audio_base_name}_transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions, detected_language

    def transcribe_whole_audio(self, language: Optional[str] = None) -> Tuple[Dict, Optional[str]]:
        """Transcribes the entire audio file without diarization."""
        logging.info(f"Transcribing the entire audio file: {self.audio_file}")
        return self.transcribe_chunk(self.audio_file, language=language)

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format (for diarized audio)."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned = []
        detected_language = transcriptions[0].get("language") if transcriptions else None
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        cleaned.append(f"Audio File: {self.audio_base_name}")
        cleaned.append("")

        current_speaker, current_text, current_start, current_end = None, "", None, None
        for i, chunk in enumerate(transcriptions):
            if current_speaker != chunk["speaker"]:
                if current_speaker is not None:
                    cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
                if i > 0 and transcriptions[i]["speaker"] != transcriptions[i-1]["speaker"]:
                    cleaned.append("") # Add a blank line before a new speaker (after the first)
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return [line for line in cleaned if line.strip() != ""]

    def clean_whole_transcription(self, whole_transcription: Dict, language: Optional[str] = None) -> List[str]:
        """Cleans the whole transcription output to a readable format (without diarization)."""
        cleaned = []
        detected_language = whole_transcription.get("language") if whole_transcription else language
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        cleaned.append(f"Audio File: {self.audio_base_name}")
        cleaned.append("")
        if whole_transcription and whole_transcription.get("segments"):
            for segment in whole_transcription["segments"]:
                cleaned.append(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str], filename="transcription.txt"):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{self.audio_base_name}_{filename}")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None):
        """Orchestrates the audio processing pipeline."""
        try:
            if self.skip_diarization:
                print("Skipping diarization and transcribing the whole audio file.")
                whole_transcription, detected_language = self.transcribe_whole_audio(language=language)
                if whole_transcription:
                    cleaned_transcriptions = self.clean_whole_transcription(whole_transcription, detected_language)
                    print("\nTranscription:")
                    for line in cleaned_transcriptions:
                        print(line)
                    self.save_transcription_to_file(cleaned_transcriptions, filename="whole_transcription.txt")
                else:
                    print("Transcription failed.")
            else:
                pipeline, diarization_result, speaker_names = self.run_diarization(max_speakers=max_speakers, min_speakers=min_speakers)
                chunk_info_list = self.chunk_audio(diarization_result)
                print("Audio file diarized and chunked.")

                num_speakers = len(speaker_names)
                print(f"\nDetected {num_speakers} speakers:")
                for i, (turn, _, speaker) in enumerate(diarization_result.itertracks(yield_label=True), 1):
                    print(f"Chunk {i}: Speaker '{speaker}' [{turn.start:.2f} - {turn.end:.2f}]")

                rename_speakers = input("\nDo you want to rename the speakers? (yes/no): ").lower()
                if rename_speakers == "yes":
                    new_speaker_names = {}
                    for label in sorted(speaker_names.keys()):
                        new_name = input(f"Enter a new name for '{speaker_names[label]}' (default: {speaker_names[label]}): ").strip()
                        if new_name:
                            new_speaker_names[label] = new_name
                        else:
                            new_speaker_names[label] = speaker_names[label]
                    speaker_names.update(new_speaker_names)

                speaker_names_path = os.path.join(self.output_dir, f"{self.audio_base_name}_speaker_names.json")
                with open(speaker_names_path, "w") as f:
                    json.dump(speaker_names, f, indent=4)
                print(f"Speaker names saved to {speaker_names_path}")

                transcriptions, detected_language = self.process_and_transcribe_chunks(chunk_info_list, language=language)

                transcriptions_json_path = os.path.join(self.output_dir, f"{self.audio_base_name}_transcriptions.json")
                cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
                print("Cleaned Transcriptions:")
                for paragraph in cleaned_transcriptions:
                    print(paragraph)

                self.save_transcription_to_file(cleaned_transcriptions, filename="diarized_transcription.txt")

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

def process_single_audio(audio_file_path, output_dir="chunks", hf_token=None, skip_diarization=False, whisper_model="base", language=None, min_speakers=None, max_speakers=None):
    transcriber = AudioTranscriber(audio_file_path, output_dir, hf_token, skip_diarization=skip_diarization, whisper_model=whisper_model)
    transcriber.process_audio(language=language, min_speakers=min_speakers, max_speakers=max_speakers)

if __name__ == "__main__":
    output_dir = "out"

    # Load HF_TOKEN if config.py exists
    hf_token = None
    try:
        from config import HF_TOKEN as config_token
        hf_token = config_token
    except ImportError:
        logging.warning(
            "config.py not found or HF_TOKEN not defined. Diarization may not work."
            " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
        )

    audio_input = input("Enter the path to an audio file or a directory containing audio files: ").strip()

    if os.path.isdir(audio_input):
        audio_files = [f for f in os.listdir(audio_input) if f.endswith(('.mp3', '.wav', '.aac', '.m4a'))]
        if not audio_files:
            print("No audio files found in the specified directory.")
        else:
            print("\nAvailable audio files:")
            for i, filename in enumerate(audio_files):
                print(f"{i + 1}. {filename}")

            while True:
                selection = input("\nChoose files to process (e.g., 'all', '1', '2,3,4', '1-3'): ").lower().strip()
                files_to_process = []

                if selection == 'all':
                    files_to_process = [os.path.join(audio_input, f) for f in audio_files]
                    break
                elif ',' in selection:
                    indices = [s.strip() for s in selection.split(',')]
                    valid_indices = True
                    selected_indices = set()
                    for index_str in indices:
                        if index_str.isdigit():
                            index = int(index_str)
                            if 1 <= index <= len(audio_files):
                                selected_indices.add(index - 1)
                            else:
                                print(f"Invalid file number: {index_str}")
                                valid_indices = False
                                break
                        else:
                            print(f"Invalid input: {index_str}")
                            valid_indices = False
                            break
                    if valid_indices:
                        files_to_process = [os.path.join(audio_input, audio_files[i]) for i in sorted(list(selected_indices))]
                        break
                elif '-' in selection:
                    try:
                        start_str, end_str = selection.split('-')
                        start_index = int(start_str.strip())
                        end_index = int(end_str.strip())
                        if 1 <= start_index <= len(audio_files) and 1 <= end_index <= len(audio_files) and start_index <= end_index:
                            files_to_process = [os.path.join(audio_input, audio_files[i]) for i in range(start_index - 1, end_index)]
                            break
                        else:
                            print("Invalid range of file numbers.")
                    except ValueError:
                        print("Invalid range format.")
                elif selection.isdigit():
                    index = int(selection)
                    if 1 <= index <= len(audio_files):
                        files_to_process = [os.path.join(audio_input, audio_files[index - 1])]
                        break
                    else:
                        print("Invalid file number.")
                else:
                    print("Invalid selection format. Please use 'all', a single number, comma-separated numbers, or a range (e.g., '1-3').")

            for audio_file_path in files_to_process:
                print(f"\n--- Processing: {audio_file_path} ---")
                # Ask for processing options for each file
                skip_diarization_input = input("Skip speaker diarization for this file? (yes/no): ").lower()
                skip_diarization = skip_diarization_input == "yes"

                whisper_model_choice = "base"
                if not skip_diarization:
                    specify_speakers = input("Specify min/max speakers for this file? (yes/no): ").lower()
                    min_speakers = None
                    max_speakers = None
                    if specify_speakers == "yes":
                        try:
                            min_speakers = int(input("Enter the minimum number of speakers (optional): ") or None)
                            max_speakers = int(input("Enter the maximum number of speakers (optional): ") or None)
                        except ValueError:
                            print("Invalid input for the number of speakers.")

                    print("\nAvailable Whisper models: tiny, base, small, medium, large")
                    chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
                    if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                        whisper_model_choice = chosen_model
                    elif chosen_model:
                        print(f"Invalid model '{chosen_model}'. Using default 'base'.")
                else:
                    print("\nAvailable Whisper models: tiny, base, small, medium, large")
                    chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
                    if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                        whisper_model_choice = chosen_model
                    elif chosen_model:
                        print(f"Invalid model '{chosen_model}'. Using default 'base'.")

                specify_language = input("Specify a language for this file? (yes/no): ").lower()
                transcription_language = None
                if specify_language == "yes":
                    transcription_language = input("Enter the language code (e.g., en, fr, es): ").strip()

                process_single_audio(audio_file_path, output_dir, hf_token, skip_diarization, whisper_model_choice, transcription_language, min_speakers, max_speakers)

    elif os.path.isfile(audio_input):
        audio_file = audio_input
        # Ask for processing options for the single file
        skip_diarization_input = input("Skip speaker diarization? (yes/no): ").lower()
        skip_diarization = skip_diarization_input == "yes"

        whisper_model_choice = "base"
        if not skip_diarization:
            specify_speakers = input("Specify min/max speakers? (yes/no): ").lower()
            min_speakers = None
            max_speakers = None
            if specify_speakers == "yes":
                try:
                    min_speakers = int(input("Enter the minimum number of speakers (optional): ") or None)
                    max_speakers = int(input("Enter the maximum number of speakers (optional): ") or None)
                except ValueError:
                    print("Invalid input for the number of speakers.")

            print("\nAvailable Whisper models: tiny, base, small, medium, large")
            chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
            if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                whisper_model_choice = chosen_model
            elif chosen_model:
                print(f"Invalid model '{chosen_model}'. Using default 'base'.")
        else:
            print("\nAvailable Whisper models: tiny, base, small, medium, large")
            chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
            if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                whisper_model_choice = chosen_model
            elif chosen_model:
                print(f"Invalid model '{chosen_model}'. Using default 'base'.")

        specify_language = input("Specify a language? (yes/no): ").lower()
        transcription_language = None
        if specify_language == "yes":
            transcription_language = input("Enter the language code (e.g., en, fr, es): ").strip()

        process_single_audio(audio_file, output_dir, hf_token, skip_diarization, whisper_model_choice, transcription_language, min_speakers, max_speakers)

    else:
        print("Invalid input. Please provide a valid file path or directory path.")

  from .autonotebook import tqdm as notebook_tqdm


Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.


Available audio files:
1. 11 lug, 11.20​ prova 2.aac
2. 4547.mp3
3. 6313.mp3
4. Botanicario - Ribes Nero.wav
5. Come STUDIARE allUNIVERSITÀ.mp3
6. Come STUDIARE allUNIVERSITÀ.wav
7. test.mp3

--- Processing: C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3 ---

Available Whisper models: tiny, base, small, medium, large


2025-03-23 21:56:18,671 - INFO - Using device: cpu
2025-03-23 21:56:20,589 - INFO - Applied quirks (see `speechbrain.utils.quirks`): [allow_tf32, disable_jit_profiling]
2025-03-23 21:56:20,590 - INFO - Excluded quirks specified by the `SB_DISABLE_QUIRKS` environment (comma-separated list): []
  if ismodule(module) and hasattr(module, '__file__'):
  std = sequences.std(dim=-1, correction=1)


Audio file diarized and chunked.

Detected 2 speakers:
Chunk 1: Speaker 'SPEAKER_00' [0.03 - 10.00]
Chunk 2: Speaker 'SPEAKER_01' [11.42 - 12.82]
Chunk 3: Speaker 'SPEAKER_00' [12.82 - 14.91]
Chunk 4: Speaker 'SPEAKER_00' [16.94 - 25.63]
Chunk 5: Speaker 'SPEAKER_01' [25.58 - 27.37]
Chunk 6: Speaker 'SPEAKER_00' [27.37 - 28.97]


2025-03-23 22:00:23,942 - INFO - Transcribing out\test\chunk_1.mp3


Speaker names saved to out\test\test_speaker_names.json
Detected language: English


100%|██████████| 997/997 [00:04<00:00, 241.98frames/s]
2025-03-23 22:00:30,949 - INFO - Transcribing out\test\chunk_2.mp3


Detected language: English


100%|██████████| 140/140 [00:03<00:00, 36.35frames/s]
2025-03-23 22:00:36,965 - INFO - Transcribing out\test\chunk_3.mp3


Detected language: English


100%|██████████| 209/209 [00:01<00:00, 201.42frames/s]
2025-03-23 22:00:40,090 - INFO - Transcribing out\test\chunk_4.mp3


Detected language: English


100%|██████████| 869/869 [00:02<00:00, 378.97frames/s]
2025-03-23 22:00:44,712 - INFO - Transcribing out\test\chunk_5.mp3


Detected language: English


100%|██████████| 178/178 [00:01<00:00, 125.47frames/s]
2025-03-23 22:00:48,236 - INFO - Transcribing out\test\chunk_6.mp3


Detected language: English


100%|██████████| 143/143 [00:01<00:00, 96.36frames/s]

Cleaned Transcriptions:
Detected Language: EN
Audio File: test
Speaker 1 [0.03 - 10.00]:  Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?
Speaker 2 [11.42 - 12.82]:  and correction and embarrassment.
Speaker 1 [12.82 - 14.91]:  Do indeed round one. Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.
Speaker 2 [25.58 - 27.37]:  I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:  But what I want to know?





sopra funziona, sotto rprovo a rimuovere i silenzi

In [6]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None, skip_diarization: bool = False, whisper_model: str = "base"):
        self.audio_file = audio_file
        self.audio_base_name = os.path.splitext(os.path.basename(self.audio_file))[0]
        self.output_dir_base = output_dir
        self.output_dir = os.path.join(self.output_dir_base, self.audio_base_name)
        self.hf_token = hf_token
        self.whisper_model = whisper_model
        self.debug_mode = False
        self.skip_diarization = skip_diarization
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks = []
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None, vad: Optional[bool or str or List[Tuple[float, float]]] = None) -> Tuple[Dict, Optional[str]]:
            """Transcribes an audio chunk and returns the transcription and detected language."""
            try:
                model = whisper.load_model(self.whisper_model)
                audio = whisper.load_audio(audio_file_path)

                if vad is not None and vad is not False:
                    logging.info(f"Performing voice activity detection with settings: {vad}")
                    if vad is True or vad == "silero":
                        result = whisper.transcribe(model, audio, language=language, vad="silero")
                    elif vad == "silero:3.1":
                        result = whisper.transcribe(model, audio, language=language, vad="silero:3.1")
                    elif vad == "auditok":
                        result = whisper.transcribe(model, audio, language=language, vad="auditok")
                    elif isinstance(vad, list):
                        speech_segments = []
                        for start, end in vad:
                            speech_segments.append(audio[int(start * whisper.audio.SAMPLE_RATE):int(end * whisper.audio.SAMPLE_RATE)])
                        if speech_segments:
                            full_transcription = {"segments":""}
                            for segment in speech_segments:
                                segment_result = whisper.transcribe(model, segment, language=language)
                                full_transcription["segments"].extend(segment_result.get("segments",))
                            result = full_transcription
                        else:
                            result = {"segments":""} # No speech segments provided
                    else:
                        logging.warning(f"Invalid VAD setting: {vad}. Transcribing without VAD.")
                        result = whisper.transcribe(model, audio, language=language)
                else:
                    result = whisper.transcribe(model, audio, language=language)

                detected_language = result.get("language") if language is None else language
                return result, detected_language
            except Exception as e:
                logging.error(f"Transcription error: {e}")
                return {}, None

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None, use_vad: bool = False, vad_method: Optional[str] = None) -> List[Dict]:
        """Processes and saves transcriptions for individual chunks."""
        transcriptions = []
        detected_language = None
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            vad_option = None
            if use_vad:
                vad_option = vad_method if vad_method else True # Use default if no method specified
            transcription, lang = self.transcribe_chunk(chunk["file_path"], language=language, vad=vad_option)
            if transcription:
                if detected_language is None and lang is not None:
                    detected_language = lang
                transcriptions.append({**chunk, "transcription": transcription, "language": lang})
                if self.debug_mode: # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']} (Language: {lang}):")
                    for segment in transcription["segments"]:
                        print(f"     [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, f"{self.audio_base_name}_transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions, detected_language

    def transcribe_whole_audio(self, language: Optional[str] = None, use_vad: bool = False, vad_method: Optional[str] = None) -> Tuple[Dict, Optional[str]]:
        """Transcribes the entire audio file without diarization."""
        logging.info(f"Transcribing the entire audio file: {self.audio_file}")
        vad_option = None
        if use_vad:
            vad_option = vad_method if vad_method else True
        return self.transcribe_chunk(self.audio_file, language=language, vad=vad_option)

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format (for diarized audio)."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned = []
        detected_language = transcriptions[0].get("language") if transcriptions else None
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        cleaned.append(f"Audio File: {self.audio_base_name}")
        cleaned.append("")

        current_speaker, current_text, current_start, current_end = None, "", None, None
        for i, chunk in enumerate(transcriptions):
            if current_speaker != chunk["speaker"]:
                if current_speaker is not None:
                    cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
                if i > 0 and transcriptions[i]["speaker"] != transcriptions[i-1]["speaker"]:
                    cleaned.append("") # Add a blank line before a new speaker (after the first)
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return [line for line in cleaned if line.strip() != ""]

    def clean_whole_transcription(self, whole_transcription: Dict, language: Optional[str] = None) -> List[str]:
        """Cleans the whole transcription output to a readable format (without diarization)."""
        cleaned = []
        detected_language = whole_transcription.get("language") if whole_transcription else language
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        cleaned.append(f"Audio File: {self.audio_base_name}")
        cleaned.append("")
        if whole_transcription and whole_transcription.get("segments"):
            for segment in whole_transcription["segments"]:
                cleaned.append(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str], filename="transcription.txt"):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{self.audio_base_name}_{filename}")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None):
        """Orchestrates the audio processing pipeline."""
        try:
            use_vad = input("Do you want to remove silent parts before transcription? (yes/no): ").lower() == "yes"
            vad_method = None
            if use_vad:
                vad_choice = input("Choose VAD method (silero, silero:3.1, auditok, or leave empty for default silero): ").lower().strip()
                if vad_choice in ["silero", "silero:3.1", "auditok"]:
                    vad_method = vad_choice
                elif vad_choice:
                    print(f"Invalid VAD method '{vad_choice}'. Using default silero.")

            if self.skip_diarization:
                print("Skipping diarization and transcribing the whole audio file.")
                whole_transcription, detected_language = self.transcribe_whole_audio(language=language, use_vad=use_vad, vad_method=vad_method)
                if whole_transcription:
                    cleaned_transcriptions = self.clean_whole_transcription(whole_transcription, detected_language)
                    print("\nTranscription:")
                    for line in cleaned_transcriptions:
                        print(line)
                    self.save_transcription_to_file(cleaned_transcriptions, filename="whole_transcription.txt")
                else:
                    print("Transcription failed.")
            else:
                pipeline, diarization_result, speaker_names = self.run_diarization(max_speakers=max_speakers, min_speakers=min_speakers)
                chunk_info_list = self.chunk_audio(diarization_result)
                print("Audio file diarized and chunked.")

                num_speakers = len(speaker_names)
                print(f"\nDetected {num_speakers} speakers:")
                for i, (turn, _, speaker) in enumerate(diarization_result.itertracks(yield_label=True), 1):
                    print(f"Chunk {i}: Speaker '{speaker}' [{turn.start:.2f} - {turn.end:.2f}]")

                rename_speakers = input("\nDo you want to rename the speakers? (yes/no): ").lower()
                if rename_speakers == "yes":
                    new_speaker_names = {}
                    for label in sorted(speaker_names.keys()):
                        new_name = input(f"Enter a new name for '{speaker_names[label]}' (default: {speaker_names[label]}): ").strip()
                        if new_name:
                            new_speaker_names[label] = new_name
                        else:
                            new_speaker_names[label] = speaker_names[label]
                    speaker_names.update(new_speaker_names)

                speaker_names_path = os.path.join(self.output_dir, f"{self.audio_base_name}_speaker_names.json")
                with open(speaker_names_path, "w") as f:
                    json.dump(speaker_names, f, indent=4)
                print(f"Speaker names saved to {speaker_names_path}")

                transcriptions, detected_language = self.process_and_transcribe_chunks(chunk_info_list, language=language, use_vad=use_vad, vad_method=vad_method)

                transcriptions_json_path = os.path.join(self.output_dir, f"{self.audio_base_name}_transcriptions.json")
                cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
                print("Cleaned Transcriptions:")
                for paragraph in cleaned_transcriptions:
                    print(paragraph)

                self.save_transcription_to_file(cleaned_transcriptions, filename="diarized_transcription.txt")

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

def process_single_audio(audio_file_path, output_dir="chunks", hf_token=None, skip_diarization=False, whisper_model="base", language=None, min_speakers=None, max_speakers=None):
    transcriber = AudioTranscriber(audio_file_path, output_dir, hf_token, skip_diarization=skip_diarization, whisper_model=whisper_model)
    transcriber.process_audio(language=language, min_speakers=min_speakers, max_speakers=max_speakers)

if __name__ == "__main__":
    output_dir = "out"

    # Load HF_TOKEN if config.py exists
    hf_token = None
    try:
        from config import HF_TOKEN as config_token
        hf_token = config_token
    except ImportError:
        logging.warning(
            "config.py not found or HF_TOKEN not defined. Diarization may not work."
            " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
        )

    audio_input = input("Enter the path to an audio file or a directory containing audio files: ").strip()

    if os.path.isdir(audio_input):
        audio_files = [f for f in os.listdir(audio_input) if f.endswith(('.mp3', '.wav', '.aac', '.m4a'))]
        if not audio_files:
            print("No audio files found in the specified directory.")
        else:
            print("\nAvailable audio files:")
            for i, filename in enumerate(audio_files):
                print(f"{i + 1}. {filename}")

            while True:
                selection = input("\nChoose files to process (e.g., 'all', '1', '2,3,4', '1-3'): ").lower().strip()
                files_to_process = []

                if selection == 'all':
                    files_to_process = [os.path.join(audio_input, f) for f in audio_files]
                    break
                elif ',' in selection:
                    indices = [s.strip() for s in selection.split(',')]
                    valid_indices = True
                    selected_indices = set()
                    for index_str in indices:
                        if index_str.isdigit():
                            index = int(index_str)
                            if 1 <= index <= len(audio_files):
                                selected_indices.add(index - 1)
                            else:
                                print(f"Invalid file number: {index_str}")
                                valid_indices = False
                                break
                        else:
                            print(f"Invalid input: {index_str}")
                            valid_indices = False
                            break
                    if valid_indices:
                        files_to_process = [os.path.join(audio_input, audio_files[i]) for i in sorted(list(selected_indices))]
                        break
                elif '-' in selection:
                    try:
                        start_str, end_str = selection.split('-')
                        start_index = int(start_str.strip())
                        end_index = int(end_str.strip())
                        if 1 <= start_index <= len(audio_files) and 1 <= end_index <= len(audio_files) and start_index <= end_index:
                            files_to_process = [os.path.join(audio_input, audio_files[i]) for i in range(start_index - 1, end_index)]
                            break
                        else:
                            print("Invalid range of file numbers.")
                    except ValueError:
                        print("Invalid range format.")
                elif selection.isdigit():
                    index = int(selection)
                    if 1 <= index <= len(audio_files):
                        files_to_process = [os.path.join(audio_input, audio_files[index - 1])]
                        break
                    else:
                        print("Invalid file number.")
                else:
                    print("Invalid selection format. Please use 'all', a single number, comma-separated numbers, or a range (e.g., '1-3').")

            for audio_file_path in files_to_process:
                print(f"\n--- Processing: {audio_file_path} ---")
                # Ask for processing options for each file
                skip_diarization_input = input("Skip speaker diarization for this file? (yes/no): ").lower()
                skip_diarization = skip_diarization_input == "yes"

                whisper_model_choice = "base"
                if not skip_diarization:
                    specify_speakers = input("Specify min/max speakers for this file? (yes/no): ").lower()
                    min_speakers = None
                    max_speakers = None
                    if specify_speakers == "yes":
                        try:
                            min_speakers = int(input("Enter the minimum number of speakers (optional): ") or None)
                            max_speakers = int(input("Enter the maximum number of speakers (optional): ") or None)
                        except ValueError:
                            print("Invalid input for the number of speakers.")

                    print("\nAvailable Whisper models: tiny, base, small, medium, large")
                    chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
                    if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                        whisper_model_choice = chosen_model
                    elif chosen_model:
                        print(f"Invalid model '{chosen_model}'. Using default 'base'.")
                else:
                    print("\nAvailable Whisper models: tiny, base, small, medium, large")
                    chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
                    if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                        whisper_model_choice = chosen_model
                    elif chosen_model:
                        print(f"Invalid model '{chosen_model}'. Using default 'base'.")

                specify_language = input("Specify a language for this file? (yes/no): ").lower()
                transcription_language = None
                if specify_language == "yes":
                    transcription_language = input("Enter the language code (e.g., en, fr, es): ").strip()

                process_single_audio(audio_file_path, output_dir, hf_token, skip_diarization, whisper_model_choice, transcription_language, min_speakers, max_speakers)

    elif os.path.isfile(audio_input):
        audio_file = audio_input
        # Ask for processing options for the single file
        skip_diarization_input = input("Skip speaker diarization? (yes/no): ").lower()
        skip_diarization = skip_diarization_input == "yes"

        whisper_model_choice = "base"
        if not skip_diarization:
            specify_speakers = input("Specify min/max speakers? (yes/no): ").lower()
            min_speakers = None
            max_speakers = None
            if specify_speakers == "yes":
                try:
                    min_speakers = int(input("Enter the minimum number of speakers (optional): ") or None)
                    max_speakers = int(input("Enter the maximum number of speakers (optional): ") or None)
                except ValueError:
                    print("Invalid input for the number of speakers.")

            print("\nAvailable Whisper models: tiny, base, small, medium, large")
            chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
            if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                whisper_model_choice = chosen_model
            elif chosen_model:
                print(f"Invalid model '{chosen_model}'. Using default 'base'.")
        else:
            print("\nAvailable Whisper models: tiny, base, small, medium, large")
            chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
            if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                whisper_model_choice = chosen_model
            elif chosen_model:
                print(f"Invalid model '{chosen_model}'. Using default 'base'.")

        specify_language = input("Specify a language? (yes/no): ").lower()
        transcription_language = None
        if specify_language == "yes":
            transcription_language = input("Enter the language code (e.g., en, fr, es): ").strip()

        process_single_audio(audio_file, output_dir, hf_token, skip_diarization, whisper_model_choice, transcription_language, min_speakers, max_speakers)

    else:
        print("Invalid input. Please provide a valid file path or directory path.")


Available audio files:
1. 11 lug, 11.20​ prova 2.aac
2. 4547.mp3
3. 6313.mp3
4. Botanicario - Ribes Nero.wav
5. Come STUDIARE allUNIVERSITÀ.mp3
6. Come STUDIARE allUNIVERSITÀ.wav
7. test.mp3

--- Processing: C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3 ---

Available Whisper models: tiny, base, small, medium, large


2025-03-23 22:31:50,445 - INFO - Using device: cpu
2025-03-23 22:31:51,580 - INFO - Applied quirks (see `speechbrain.utils.quirks`): [allow_tf32, disable_jit_profiling]
2025-03-23 22:31:51,581 - INFO - Excluded quirks specified by the `SB_DISABLE_QUIRKS` environment (comma-separated list): []
  if ismodule(module) and hasattr(module, '__file__'):
  std = sequences.std(dim=-1, correction=1)


Audio file diarized and chunked.

Detected 2 speakers:
Chunk 1: Speaker 'SPEAKER_00' [0.03 - 10.00]
Chunk 2: Speaker 'SPEAKER_01' [11.42 - 12.82]
Chunk 3: Speaker 'SPEAKER_00' [12.82 - 14.91]
Chunk 4: Speaker 'SPEAKER_00' [16.94 - 25.63]
Chunk 5: Speaker 'SPEAKER_01' [25.58 - 27.37]
Chunk 6: Speaker 'SPEAKER_00' [27.37 - 28.97]


2025-03-23 22:32:27,732 - INFO - Transcribing out\test\chunk_1.mp3


Speaker names saved to out\test\test_speaker_names.json


2025-03-23 22:32:29,240 - INFO - Performing voice activity detection with settings: True
Downloading: "https://github.com/snakers4/silero-vad/zipball/master" to C:\Users\Admin/.cache\torch\hub\master.zip


Detected language: English


100%|██████████| 997/997 [00:04<00:00, 208.87frames/s]
2025-03-23 22:32:39,925 - INFO - Transcribing out\test\chunk_2.mp3
2025-03-23 22:32:41,198 - INFO - Performing voice activity detection with settings: True


Detected language: English


100%|██████████| 140/140 [00:01<00:00, 71.86frames/s]
2025-03-23 22:32:44,887 - INFO - Transcribing out\test\chunk_3.mp3
2025-03-23 22:32:46,103 - INFO - Performing voice activity detection with settings: True


Detected language: English


100%|██████████| 145/145 [00:02<00:00, 71.26frames/s]
2025-03-23 22:32:49,890 - INFO - Transcribing out\test\chunk_4.mp3
2025-03-23 22:32:51,119 - INFO - Performing voice activity detection with settings: True


Detected language: English


100%|██████████| 869/869 [00:02<00:00, 295.27frames/s]
2025-03-23 22:32:55,958 - INFO - Transcribing out\test\chunk_5.mp3
2025-03-23 22:32:57,177 - INFO - Performing voice activity detection with settings: True


Detected language: English


100%|██████████| 178/178 [00:02<00:00, 83.66frames/s]
2025-03-23 22:33:01,118 - INFO - Transcribing out\test\chunk_6.mp3
2025-03-23 22:33:02,366 - INFO - Performing voice activity detection with settings: True


Detected language: English


100%|██████████| 143/143 [00:02<00:00, 67.67frames/s]

Cleaned Transcriptions:
Detected Language: EN
Audio File: test
Speaker 1 [0.03 - 10.00]:  Anyway, look where we're digressing the rules.  Oh, simple, Emma, you're about to face five questions  of increasing difficulty.  You must answer as quickly as possible.  If you get it correct, you move onto the next round.  Do you know what happens if you get it wrong?
Speaker 2 [11.42 - 12.82]:  and correction and embarrassment.
Speaker 1 [12.82 - 14.91]:  to indeed.  Brown. Round 1 astronomers are saying that Saturn's rings are slowly disappearing.  They estimate we only have a few hundred million years left of them.
Speaker 2 [25.58 - 27.37]:  I'll earn you a few hundred million.
Speaker 1 [27.37 - 28.97]:  But what I want to know?





In [7]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None, skip_diarization: bool = False, whisper_model: str = "base"):
        self.audio_file = audio_file
        self.audio_base_name = os.path.splitext(os.path.basename(self.audio_file))[0]
        self.output_dir_base = output_dir  # Base output directory
        self.output_dir = os.path.join(self.output_dir_base, self.audio_base_name) # final output dir
        self.hf_token = hf_token
        self.whisper_model = whisper_model
        self.debug_mode = False
        self.skip_diarization = skip_diarization
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks = []
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None, vad: Optional[bool or str or List[Tuple[float, float]]] = None, verbose: bool = False, plot_word_alignment: bool = False, detect_disfluencies: bool = False) -> Tuple[Dict, Optional[str]]:
        """Transcribes an audio chunk and returns the transcription and detected language."""
        try:
            model = whisper.load_model(self.whisper_model)
            audio = whisper.load_audio(audio_file_path)

            if vad is not None and vad is not False:
                logging.info(f"Performing voice activity detection with settings: {vad}")
                if vad is True or vad == "silero":
                    result = whisper.transcribe(model, audio, language=language, vad="silero", verbose=verbose, plot_word_alignment=plot_word_alignment, detect_disfluencies=detect_disfluencies)
                elif vad == "silero:3.1":
                    result = whisper.transcribe(model, audio, language=language, vad="silero:3.1", verbose=verbose, plot_word_alignment=plot_word_alignment, detect_disfluencies=detect_disfluencies)
                elif vad == "auditok":
                    result = whisper.transcribe(model, audio, language=language, vad="auditok", verbose=verbose, plot_word_alignment=plot_word_alignment, detect_disfluencies=detect_disfluencies)
                elif isinstance(vad, list):
                    speech_segments = []
                    for start, end in vad:
                        speech_segments.append(audio[int(start * whisper.audio.SAMPLE_RATE):int(end * whisper.audio.SAMPLE_RATE)])
                    if speech_segments:
                        full_transcription = {"segments":""}
                        for segment in speech_segments:
                            segment_result = whisper.transcribe(model, segment, language=language, verbose=verbose, plot_word_alignment=plot_word_alignment, detect_disfluencies=detect_disfluencies)
                            full_transcription["segments"].extend(segment_result.get("segments",))
                        result = full_transcription
                else:
                    logging.warning(f"Invalid VAD setting: {vad}. Transcribing without VAD.")
                    result = whisper.transcribe(model, audio, language=language, verbose=verbose, plot_word_alignment=plot_word_alignment, detect_disfluencies=detect_disfluencies)
            else:
                result = whisper.transcribe(model, audio, language=language, verbose=verbose, plot_word_alignment=plot_word_alignment, detect_disfluencies=detect_disfluencies)

            detected_language = result.get("language") if language is None else language
            return result, detected_language
        except Exception as e:
            logging.error(f"Transcription error: {e}")
            return {}, None

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None, use_vad: bool = False, vad_method: Optional[str] = None, verbose: bool = False, plot_word_alignment: bool = False, detect_disfluencies: bool = False) -> List[Dict]:
        """Processes and saves transcriptions for individual chunks."""
        transcriptions = []
        detected_language = None
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            vad_option = None
            if use_vad:
                vad_option = vad_method if vad_method else True # Use default if no method specified
            transcription, lang = self.transcribe_chunk(chunk["file_path"], language=language, vad=vad_option, verbose=verbose, plot_word_alignment=plot_word_alignment, detect_disfluencies=detect_disfluencies)
            if transcription:
                if detected_language is None and lang is not None:
                    detected_language = lang
                transcriptions.append({**chunk, "transcription": transcription, "language": lang})
                if self.debug_mode: # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']} (Language: {lang}):")
                    for segment in transcription["segments"]:
                        print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, f"{self.audio_base_name}_transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions, detected_language

    def transcribe_whole_audio(self, language: Optional[str] = None, use_vad: bool = False, vad_method: Optional[str] = None, verbose: bool = False, plot_word_alignment: bool = False, detect_disfluencies: bool = False) -> Tuple[Dict, Optional[str]]:
        """Transcribes the entire audio file without diarization."""
        logging.info(f"Transcribing the entire audio file: {self.audio_file}")
        vad_option = None
        if use_vad:
            vad_option = vad_method if vad_method else True
        return self.transcribe_chunk(self.audio_file, language=language, vad=vad_option, verbose=verbose, plot_word_alignment=plot_word_alignment, detect_disfluencies=detect_disfluencies)

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format (for diarized audio)."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned = []
        detected_language = transcriptions[0].get("language") if transcriptions else None
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        cleaned.append(f"Audio File: {self.audio_base_name}")
        cleaned.append("")

        current_speaker, current_text, current_start, current_end = None, "", None, None
        for i, chunk in enumerate(transcriptions):
            if current_speaker != chunk["speaker"]:
                if current_speaker is not None:
                    cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
                if i > 0 and transcriptions[i]["speaker"] != transcriptions[i-1]["speaker"]:
                    cleaned.append("") # Add a blank line before a new speaker (after the first)
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return [line for line in cleaned if line.strip() != ""]

    def clean_whole_transcription(self, whole_transcription: Dict, language: Optional[str] = None) -> List[str]:
        """Cleans the whole transcription output to a readable format (without diarization)."""
        cleaned = []
        detected_language = whole_transcription.get("language") if whole_transcription else language
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        cleaned.append(f"Audio File: {self.audio_base_name}")
        cleaned.append("")
        if whole_transcription and whole_transcription.get("segments"):
            for segment in whole_transcription["segments"]:
                cleaned.append(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str], filename="transcription.txt"):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{self.audio_base_name}_{filename}")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None):
        """Orchestrates the audio processing pipeline."""
        try:
            use_vad = input("Do you want to remove silent parts before transcription? (yes/no): ").lower() == "yes"
            vad_method = None
            if use_vad:
                vad_choice = input("Choose VAD method (silero, silero:3.1, auditok, or leave empty for default silero): ").lower().strip()
                if vad_choice in ["silero", "silero:3.1", "auditok"]:
                    vad_method = vad_choice
                elif vad_choice:
                    print(f"Invalid VAD method '{vad_choice}'. Using default silero.")

            verbose_input = input("Enable verbose output for Whisper? (yes/no): ").lower() == "yes"
            plot_alignment_input = input("Enable plotting word alignment (if supported by model)? (yes/no): ").lower() == "yes"
            detect_disfluencies_input = input("Enable disfluency detection (if supported by model)? (yes/no): ").lower() == "yes"

            if self.skip_diarization:
                print("Skipping diarization and transcribing the whole audio file.")
                whole_transcription, detected_language = self.transcribe_whole_audio(language=language, use_vad=use_vad, vad_method=vad_method, verbose=verbose_input, plot_word_alignment=plot_alignment_input, detect_disfluencies=detect_disfluencies_input)
                if whole_transcription:
                    cleaned_transcriptions = self.clean_whole_transcription(whole_transcription, detected_language)
                    print("\nTranscription:")
                    for line in cleaned_transcriptions:
                        print(line)
                    self.save_transcription_to_file(cleaned_transcriptions, filename="whole_transcription.txt")
                else:
                    print("Transcription failed.")
            else:
                pipeline, diarization_result, speaker_names = self.run_diarization(max_speakers=max_speakers, min_speakers=min_speakers)
                chunk_info_list = self.chunk_audio(diarization_result)
                print("Audio file diarized and chunked.")

                num_speakers = len(speaker_names)
                print(f"\nDetected {num_speakers} speakers:")
                for i, (turn, _, speaker) in enumerate(diarization_result.itertracks(yield_label=True), 1):
                    print(f"Chunk {i}: Speaker '{speaker}' [{turn.start:.2f} - {turn.end:.2f}]")

                rename_speakers = input("\nDo you want to rename the speakers? (yes/no): ").lower()
                if rename_speakers == "yes":
                    new_speaker_names = {}
                    for label in sorted(speaker_names.keys()):
                        new_name = input(f"Enter a new name for '{speaker_names[label]}' (default: {speaker_names[label]}): ").strip()
                        if new_name:
                            new_speaker_names[label] = new_name
                        else:
                            new_speaker_names[label] = speaker_names[label]
                    speaker_names.update(new_speaker_names)

                speaker_names_path = os.path.join(self.output_dir, f"{self.audio_base_name}_speaker_names.json")
                with open(speaker_names_path, "w") as f:
                    json.dump(speaker_names, f, indent=4)
                print(f"Speaker names saved to {speaker_names_path}")

                transcriptions, detected_language = self.process_and_transcribe_chunks(chunk_info_list, language=language, use_vad=use_vad, vad_method=vad_method, verbose=verbose_input, plot_word_alignment=plot_alignment_input, detect_disfluencies=detect_disfluencies_input)

                transcriptions_json_path = os.path.join(self.output_dir, f"{self.audio_base_name}_transcriptions.json")
                cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
                print("Cleaned Transcriptions:")
                for paragraph in cleaned_transcriptions:
                    print(paragraph)

                self.save_transcription_to_file(cleaned_transcriptions, filename="diarized_transcription.txt")

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

def process_single_audio(audio_file_path, output_dir="chunks", hf_token=None, skip_diarization=False, whisper_model="base", language=None, min_speakers=None, max_speakers=None, verbose=False, plot_word_alignment=False, detect_disfluencies=False):
    transcriber = AudioTranscriber(audio_file_path, output_dir, hf_token, skip_diarization=skip_diarization, whisper_model=whisper_model)
    transcriber.process_audio(language=language, min_speakers=min_speakers, max_speakers=max_speakers)

if __name__ == "__main__":
    output_dir_base = input("Enter the main output directory (default: output): ").strip() or "output"

    # Load HF_TOKEN if config.py exists
    hf_token = None
    try:
        from config import HF_TOKEN as config_token
        hf_token = config_token
    except ImportError:
        logging.warning(
            "config.py not found or HF_TOKEN not defined. Diarization may not work."
            " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
        )

    audio_input = input("Enter the path to an audio file or a directory containing audio files: ").strip()

    if os.path.isdir(audio_input):
        audio_files = [f for f in os.listdir(audio_input) if f.endswith(('.mp3', '.wav', '.aac', '.m4a'))]
        if not audio_files:
            print("No audio files found in the specified directory.")
        else:
            print("\nAvailable audio files:")
            for i, filename in enumerate(audio_files):
                print(f"{i + 1}. {filename}")

            while True:
                selection = input("\nChoose files to process (e.g., 'all', '1', '2,3,4', '1-3'): ").lower().strip()
                files_to_process = []

                if selection == 'all':
                    files_to_process = [os.path.join(audio_input, f) for f in audio_files]
                    break
                elif ',' in selection:
                    indices = [s.strip() for s in selection.split(',')]
                    valid_indices = True
                    selected_indices = set()
                    for index_str in indices:
                        if index_str.isdigit():
                            index = int(index_str)
                            if 1 <= index <= len(audio_files):
                                selected_indices.add(index - 1)
                            else:
                                print(f"Invalid file number: {index_str}")
                                valid_indices = False
                                break
                        else:
                            print(f"Invalid input: {index_str}")
                            valid_indices = False
                            break
                    if valid_indices:
                        files_to_process = [os.path.join(audio_input, audio_files[i]) for i in sorted(list(selected_indices))]
                        break
                elif '-' in selection:
                    try:
                        start_str, end_str = selection.split('-')
                        start_index = int(start_str.strip())
                        end_index = int(end_str.strip())
                        if 1 <= start_index <= len(audio_files) and 1 <= end_index <= len(audio_files) and start_index <= end_index:
                            files_to_process = [os.path.join(audio_input, audio_files[i]) for i in range(start_index - 1, end_index)]
                            break
                        else:
                            print("Invalid range of file numbers.")
                    except ValueError:
                        print("Invalid range format.")
                elif selection.isdigit():
                    index = int(selection)
                    if 1 <= index <= len(audio_files):
                        files_to_process = [os.path.join(audio_input, audio_files[index - 1])]
                        break
                    else:
                        print("Invalid file number.")
                else:
                    print("Invalid selection format. Please use 'all', a single number, comma-separated numbers, or a range (e.g., '1-3').")

            for audio_file_path in files_to_process:
                print(f"\n--- Processing: {audio_file_path} ---")
                # Ask for processing options for each file
                skip_diarization_input = input("Skip speaker diarization for this file? (yes/no): ").lower()
                skip_diarization = skip_diarization_input == "yes"

                whisper_model_choice = "base"
                min_speakers = None #set default values
                max_speakers = None
                if not skip_diarization:
                    specify_speakers = input("Specify min/max speakers for this file? (yes/no): ").lower()

                    if specify_speakers == "yes":
                        try:
                            min_speakers = int(input("Enter the minimum number of speakers (optional): ") or None)
                            max_speakers = int(input("Enter the maximum number of speakers (optional): ") or None)
                        except ValueError:
                            print("Invalid input for the number of speakers.")

                    print("\nAvailable Whisper models: tiny, base, small, medium, large")
                    chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
                    if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                        whisper_model_choice = chosen_model
                    elif chosen_model:
                        print(f"Invalid model '{chosen_model}'. Using default 'base'.")
                else:
                    print("\nAvailable Whisper models: tiny, base, small, medium, large")
                    chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
                    if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                        whisper_model_choice = chosen_model
                    elif chosen_model:
                        print(f"Invalid model '{chosen_model}'. Using default 'base'.")

                specify_language = input("Specify a language for this file? (yes/no): ").lower()
                transcription_language = None
                if specify_language == "yes":
                    transcription_language = input("Enter the language code (e.g., en, fr, es): ").strip()

                process_single_audio(audio_file_path, os.path.join(output_dir_base, "chunks"), hf_token, skip_diarization, whisper_model_choice, transcription_language, min_speakers, max_speakers)

    elif os.path.isfile(audio_input):
        audio_file = audio_input
        # Ask for processing options for the single file
        skip_diarization_input = input("Skip speaker diarization? (yes/no): ").lower()
        skip_diarization = skip_diarization_input == "yes"

        whisper_model_choice = "base"
        min_speakers = None #set default values
        max_speakers = None
        if not skip_diarization:
            specify_speakers = input("Specify min/max speakers? (yes/no): ").lower()
            if specify_speakers == "yes":
                try:
                    min_speakers = int(input("Enter the minimum number of speakers (optional): ") or None)
                    max_speakers = int(input("Enter the maximum number of speakers (optional): ") or None)
                except ValueError:
                    print("Invalid input for the number of speakers.")

            print("\nAvailable Whisper models: tiny, base, small, medium, large")
            chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
            if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                whisper_model_choice = chosen_model
            elif chosen_model:
                print(f"Invalid model '{chosen_model}'. Using default 'base'.")
        else:
            print("\nAvailable Whisper models: tiny, base, small, medium, large")
            chosen_model = input("Choose a Whisper model (default: base): ").lower().strip()
            if chosen_model in ["tiny", "base", "small", "medium", "large"]:
                whisper_model_choice = chosen_model
            elif chosen_model:
                print(f"Invalid model '{chosen_model}'. Using default 'base'.")

        specify_language = input("Specify a language for this file? (yes/no): ").lower()
        transcription_language = None
        if specify_language == "yes":
            transcription_language = input("Enter the language code (e.g., en, fr, es): ").strip()

        process_single_audio(audio_file, os.path.join(output_dir_base, "chunks"), hf_token, skip_diarization, whisper_model_choice, transcription_language, min_speakers, max_speakers)



Available audio files:
1. 11 lug, 11.20​ prova 2.aac
2. 4547.mp3
3. 6313.mp3
4. Botanicario - Ribes Nero.wav
5. Come STUDIARE allUNIVERSITÀ.mp3
6. Come STUDIARE allUNIVERSITÀ.wav
7. test.mp3

--- Processing: C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3 ---

Available Whisper models: tiny, base, small, medium, large


2025-03-24 20:53:19,705 - INFO - Transcribing the entire audio file: C:\Users\Admin\Documents\Coding\Transcriptor\audio\test.mp3


Skipping diarization and transcribing the whole audio file.
Detected language: English


100%|██████████| 2880/2880 [00:07<00:00, 376.79frames/s]


Transcription:
Detected Language: EN

Audio File: test

[0.00 - 1.62]  Anyway, look, we're digressing the rules.
[1.74 - 4.34]  Oh, simple, Emma, you're about to face five questions.
[4.36 - 5.34]  It's been increasing difficulty.
[5.34 - 6.60]  You must answer as quickly as possible.
[6.66 - 8.34]  If you get it correct, you move onto the next round.
[8.62 - 9.78]  Do you know what happens if you get it wrong?
[11.44 - 12.42]  I've got an embarrassment.
[12.78 - 13.31]  You do indeed.
[13.31 - 14.78]  Round one.
[16.98 - 19.90]  Round one, astronomers are saying that Saturn's rings
[20.02 - 21.44]  are slowly disappearing.
[21.48 - 25.62]  They estimate we only have a few hundred million years left of them.
[25.64 - 27.10]  I'll only have a few hundred million.
[27.46 - 28.44]  But what I want to know?





In [9]:
import logging
import torch
import torchaudio
from pyannote.audio import Pipeline
from pyannote.core import Annotation
from typing import List, Dict, Tuple, Optional
import os
import json
from pydub import AudioSegment
import whisper_timestamped as whisper
import argparse

class AudioTranscriber:
    def __init__(self, audio_file: str, output_dir: str = "chunks", hf_token: Optional[str] = None, skip_diarization: bool = False, whisper_model: str = "base"):
        self.audio_file = audio_file
        self.audio_base_name = os.path.splitext(os.path.basename(self.audio_file))[0]
        self.output_dir_base = output_dir  # Base output directory
        self.output_dir = os.path.join(self.output_dir_base, self.audio_base_name)  # final output dir
        self.hf_token = hf_token
        self.whisper_model = whisper_model
        self.debug_mode = False
        self.skip_diarization = skip_diarization
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

    def load_audio(self, audio_file_path: str) -> Tuple[torch.Tensor, int]:
        """Loads an audio file using torchaudio."""
        try:
            waveform, sample_rate = torchaudio.load(audio_file_path)
            return waveform, sample_rate
        except Exception as e:
            logging.error(f"Error loading audio file: {e}")
            raise

    def run_diarization(
        self,
        max_speakers: Optional[int] = None,
        min_speakers: Optional[int] = None,
    ) -> Tuple[Pipeline, Annotation, Dict[str, str]]:
        """Performs speaker diarization."""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        logging.info(f"Using device: {device}")
        pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1", use_auth_token=self.hf_token
        ).to(device)
        waveform, sample_rate = self.load_audio(self.audio_file)
        input_data = {"waveform": waveform, "sample_rate": sample_rate}
        if max_speakers:
            input_data["max_speakers"] = max_speakers
        if min_speakers:
            input_data["min_speakers"] = min_speakers
        diarization = pipeline(input_data)
        speaker_labels = set()
        for segment, _, label in diarization.itertracks(yield_label=True):
            speaker_labels.add(label)
        speaker_names = {
            label: f"Speaker {i + 1}" for i, label in enumerate(sorted(speaker_labels))
        }
        return pipeline, diarization, speaker_names

    def chunk_audio(self, diarization: Annotation) -> List[Dict]:
        """Chunks audio based on diarization."""
        audio = AudioSegment.from_file(self.audio_file)
        chunks = []
        for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True), 1):
            start_ms, end_ms = int(turn.start * 1000), int(turn.end * 1000)
            chunk_path = os.path.join(self.output_dir, f"chunk_{i}.mp3")
            audio[start_ms:end_ms].export(chunk_path, format="mp3")
            chunks.append({"file_path": chunk_path, "speaker": speaker, "start_time": turn.start, "end_time": turn.end})
        return chunks

    def transcribe_chunk(self, audio_file_path: str, language: Optional[str] = None,
                         vad: Optional[bool or str or List[Tuple[float, float]]] = None, verbose: bool = False,
                         plot_word_alignment: bool = False,
                         detect_disfluencies: bool = False) -> Tuple[Dict, Optional[str]]:
        """Transcribes an audio chunk and returns the transcription and detected language."""
        try:
            model = whisper.load_model(self.whisper_model)
            audio = whisper.load_audio(audio_file_path)

            if vad is not None and vad is not False:
                logging.info(f"Performing voice activity detection with settings: {vad}")
                if vad is True or vad == "silero":
                    result = whisper.transcribe(model, audio, language=language, vad="silero", verbose=verbose,
                                                plot_word_alignment=plot_word_alignment,
                                                detect_disfluencies=detect_disfluencies)
                elif vad == "silero:3.1":
                    result = whisper.transcribe(model, audio, language=language, vad="silero:3.1", verbose=verbose,
                                                plot_word_alignment=plot_word_alignment,
                                                detect_disfluencies=detect_disfluencies)
                elif vad == "auditok":
                    result = whisper.transcribe(model, audio, language=language, vad="auditok", verbose=verbose,
                                                plot_word_alignment=plot_word_alignment,
                                                detect_disfluencies=detect_disfluencies)
                elif isinstance(vad, list):
                    speech_segments = []
                    for start, end in vad:
                        speech_segments.append(
                            audio[int(start * whisper.audio.SAMPLE_RATE):int(end * whisper.audio.SAMPLE_RATE)])
                    if speech_segments:
                        full_transcription = {"segments": ""}
                        for segment in speech_segments:
                            segment_result = whisper.transcribe(model, segment, language=language, verbose=verbose,
                                                                plot_word_alignment=plot_word_alignment,
                                                                detect_disfluencies=detect_disfluencies)
                            full_transcription["segments"].extend(segment_result.get("segments",))
                        result = full_transcription
                else:
                    logging.warning(f"Invalid VAD setting: {vad}. Transcribing without VAD.")
                    result = whisper.transcribe(model, audio, language=language, verbose=verbose,
                                                plot_word_alignment=plot_word_alignment,
                                                detect_disfluencies=detect_disfluencies)
            else:
                result = whisper.transcribe(model, audio, language=language, verbose=verbose,
                                            plot_word_alignment=plot_word_alignment,
                                            detect_disfluencies=detect_disfluencies)

            detected_language = result.get("language") if language is None else language
            return result, detected_language
        except Exception as e:
            logging.error(f"Transcription error: {e}")
            return {}, None

    def process_and_transcribe_chunks(self, chunks: List[Dict], language: Optional[str] = None, use_vad: bool = False,
                                      vad_method: Optional[str] = None, verbose: bool = False,
                                      plot_word_alignment: bool = False,
                                      detect_disfluencies: bool = False) -> List[Dict]:
        """Processes and saves transcriptions for individual chunks."""
        transcriptions = []
        detected_language = None
        for chunk in chunks:
            logging.info(f"Transcribing {chunk['file_path']}")
            vad_option = None
            if use_vad:
                vad_option = vad_method if vad_method else True  # Use default if no method specified
            transcription, lang = self.transcribe_chunk(chunk["file_path"], language=language, vad=vad_option,
                                                        verbose=verbose, plot_word_alignment=plot_word_alignment,
                                                        detect_disfluencies=detect_disfluencies)
            if transcription:
                if detected_language is None and lang is not None:
                    detected_language = lang
                transcriptions.append({**chunk, "transcription": transcription, "language": lang})
                if self.debug_mode:  # only print if debug mode is on
                    print(f"Transcription for {chunk['file_path']} (Language: {lang}):")
                    for segment in transcription["segments"]:
                        print(f"    [{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

        transcriptions_json_path = os.path.join(self.output_dir, f"{self.audio_base_name}_transcriptions.json")
        with open(transcriptions_json_path, "w") as f:
            json.dump(transcriptions, f, indent=4)
        return transcriptions, detected_language

    def transcribe_whole_audio(self, language: Optional[str] = None, use_vad: bool = False,
                                vad_method: Optional[str] = None, verbose: bool = False,
                                plot_word_alignment: bool = False,
                                detect_disfluencies: bool = False) -> Tuple[Dict, Optional[str]]:
        """Transcribes the entire audio file without diarization."""
        logging.info(f"Transcribing the entire audio file: {self.audio_file}")
        vad_option = None
        if use_vad:
            vad_option = vad_method if vad_method else True
        return self.transcribe_chunk(self.audio_file, language=language, vad=vad_option, verbose=verbose,
                                    plot_word_alignment=plot_word_alignment,
                                    detect_disfluencies=detect_disfluencies)

    def clean_transcription(self, transcriptions_json: str, speaker_names: Dict[str, str]) -> List[str]:
        """Cleans the transcription JSON to a readable format (for diarized audio)."""
        with open(transcriptions_json, 'r') as f:
            transcriptions = json.load(f)

        cleaned = []
        detected_language = transcriptions[0].get("language") if transcriptions else None
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        cleaned.append(f"Audio File: {self.audio_base_name}")
        cleaned.append("")

        current_speaker, current_text, current_start, current_end = None, "", None, None
        for i, chunk in enumerate(transcriptions):
            if current_speaker != chunk["speaker"]:
                if current_speaker is not None:
                    cleaned.append(
                        f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
                current_speaker, current_text = chunk["speaker"], ""
                current_start, current_end = chunk["start_time"], chunk["end_time"]
                if i > 0 and transcriptions[i]["speaker"] != transcriptions[i - 1]["speaker"]:
                    cleaned.append("")  # Add a blank line before a new speaker (after the first)
            if chunk["transcription"] and chunk["transcription"]["segments"]:
                current_text += " ".join(seg["text"] for seg in chunk["transcription"]["segments"])
        if current_speaker:
            cleaned.append(
                f"{speaker_names.get(current_speaker, current_speaker)} [{current_start:.2f} - {current_end:.2f}]: {current_text}")
        return [line for line in cleaned if line.strip() != ""]

    def clean_whole_transcription(self, whole_transcription: Dict, language: Optional[str] = None) -> List[str]:
        """Cleans the whole transcription output to a readable format (without diarization)."""
        cleaned = []
        detected_language = whole_transcription.get("language") if whole_transcription else language
        if detected_language:
            cleaned.append(f"Detected Language: {detected_language.upper()}")
            cleaned.append("")
        cleaned.append(f"Audio File: {self.audio_base_name}")
        cleaned.append("")
        if whole_transcription and whole_transcription.get("segments"):
            for segment in whole_transcription["segments"]:
                cleaned.append(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")
        return cleaned

    def save_transcription_to_file(self, cleaned_transcriptions: List[str], filename="transcription.txt"):
        """Saves the cleaned transcription to a text file."""
        output_file = os.path.join(self.output_dir, f"{self.audio_base_name}_{filename}")
        with open(output_file, "w") as f:
            f.write("\n".join(cleaned_transcriptions))

    def process_audio(self, language: Optional[str] = None, min_speakers: Optional[int] = None,
                      max_speakers: Optional[int] = None, use_vad: bool = False,
                      vad_method: Optional[str] = None, verbose: bool = False,
                      plot_word_alignment: bool = False,
                      detect_disfluencies: bool = False, no_rename_speakers: bool = False):
        """Orchestrates the audio processing pipeline."""
        try:
            if self.skip_diarization:
                print("Skipping diarization and transcribing the whole audio file.")
                whole_transcription, detected_language = self.transcribe_whole_audio(language=language, use_vad=use_vad,
                                                                                    vad_method=vad_method, verbose=verbose,
                                                                                    plot_word_alignment=plot_word_alignment,
                                                                                    detect_disfluencies=detect_disfluencies)
                if whole_transcription:
                    cleaned_transcriptions = self.clean_whole_transcription(whole_transcription, detected_language)
                    print("\nTranscription:")
                    for line in cleaned_transcriptions:
                        print(line)
                    self.save_transcription_to_file(cleaned_transcriptions, filename="whole_transcription.txt")
                else:
                    print("Transcription failed.")
            else:
                pipeline, diarization_result, speaker_names = self.run_diarization(max_speakers=max_speakers,
                                                                                    min_speakers=min_speakers)
                chunk_info_list = self.chunk_audio(diarization_result)
                print("Audio file diarized and chunked.")

                num_speakers = len(speaker_names)
                print(f"\nDetected {num_speakers} speakers:")
                for i, (turn, _, speaker) in enumerate(diarization_result.itertracks(yield_label=True), 1):
                    print(f"Chunk {i}: Speaker '{speaker}' [{turn.start:.2f} - {turn.end:.2f}]")

                # Speaker renaming moved here
                if not no_rename_speakers:
                    new_speaker_names = {}
                    for label in sorted(speaker_names.keys()):
                        new_name = input(
                            f"Enter a new name for '{speaker_names[label]}' (default: {speaker_names[label]}): ").strip()
                        if new_name:
                            new_speaker_names[label] = new_name
                        else:
                            new_speaker_names[label] = speaker_names[label]
                    speaker_names.update(new_speaker_names)

                    speaker_names_path = os.path.join(self.output_dir, f"{self.audio_base_name}_speaker_names.json")
                    with open(speaker_names_path, "w") as f:
                        json.dump(speaker_names, f, indent=4)
                    print(f"Speaker names saved to {speaker_names_path}")

                transcriptions, detected_language = self.process_and_transcribe_chunks(chunk_info_list, language=language,
                                                                                        use_vad=use_vad,
                                                                                        vad_method=vad_method, verbose=verbose,
                                                                                        plot_word_alignment=plot_word_alignment,
                                                                                        detect_disfluencies=detect_disfluencies)

                transcriptions_json_path = os.path.join(self.output_dir,
                                                        f"{self.audio_base_name}_transcriptions.json")
                cleaned_transcriptions = self.clean_transcription(transcriptions_json_path, speaker_names)
                print("Cleaned Transcriptions:")
                for paragraph in cleaned_transcriptions:
                    print(paragraph)

                self.save_transcription_to_file(cleaned_transcriptions, filename="diarized_transcription.txt")

        except Exception as e:
            logging.error(f"An error occurred: {e}")
            print(f"An error occurred: {e}")

def process_single_audio(audio_file_path, output_dir="chunks", hf_token=None, skip_diarization=False,
                       whisper_model="base", language=None, min_speakers=None, max_speakers=None,
                       use_vad=False, vad_method=None, verbose=False, plot_word_alignment=False,
                       detect_disfluencies=False, no_rename_speakers=False):
    transcriber = AudioTranscriber(audio_file_path, output_dir, hf_token, skip_diarization=skip_diarization,
                                    whisper_model=whisper_model)
    transcriber.process_audio(language=language, min_speakers=min_speakers, max_speakers=max_speakers,
                                use_vad=use_vad, vad_method=vad_method, verbose=verbose,
                                plot_word_alignment=plot_word_alignment,
                                detect_disfluencies=detect_disfluencies,
                                no_rename_speakers=no_rename_speakers)

def main():
    parser = argparse.ArgumentParser(description="Transcribe audio files with speaker diarization and/or VAD.")
    parser.add_argument("audio_input",
                        help="Path to an audio file or a directory containing audio files.  If a directory is provided, the script will process all audio files in that directory.")
    parser.add_argument("--output_dir", default="output",
                        help="The main directory where all output files will be saved.  Defaults to 'output'.  If the input is a directory, a subdirectory with the name of each audio file will be created inside this directory.")
    parser.add_argument("--hf_token", default=None,
                        help="Hugging Face API token.  Required for speaker diarization.  If not provided, the script will attempt to read it from a config.py file.")
    parser.add_argument("--skip_diarization", action="store_true",
                        help="Skip speaker diarization and transcribe the entire audio file as a single speaker.")
    parser.add_argument("--whisper_model", default="base", choices=["tiny", "base", "small", "medium", "large"],
                        help="Choose a Whisper model size.  Defaults to 'base'.")
    parser.add_argument("--language", default=None,
                        help="Specify the language of the audio file (e.g., 'en', 'fr', 'es').  If not provided, Whisper will attempt to detect the language.")
    parser.add_argument("--min_speakers", type=int, default=None,
                        help="Minimum number of speakers expected in the audio. Used for diarization.")
    parser.add_argument("--max_speakers", type=int, default=None,
                        help="Maximum number of speakers expected in the audio. Used for diarization.")
    parser.add_argument("--use_vad", action="store_true",
                        help="Enable voice activity detection to remove silent parts before transcription.")
    parser.add_argument("--vad_method", default=None, choices=["silero", "silero:3.1", "auditok"],
                        help="Choose a VAD method: 'silero',  'silero:3.1', or 'auditok'.  If --use_vad is set and this is not provided, 'silero' is used as default.")
    parser.add_argument("--verbose", action="store_true",
                        help="Enable verbose output for Whisper.")
    parser.add_argument("--plot_word_alignment", action="store_true",
                        help="Enable plotting word alignment (if supported by the model).")
    parser.add_argument("--detect_disfluencies", action="store_true",
                        help="Enable disfluency detection (if supported by the model).")
    parser.add_argument("--no_rename_speakers", action="store_true",
                        help="Disable the interactive prompt to rename speakers.")

    args = parser.parse_args()

    output_dir_base = args.output_dir

    # Load HF_TOKEN
    hf_token = args.hf_token
    if hf_token is None:
        try:
            from config import HF_TOKEN as config_token
            hf_token = config_token
        except ImportError:
            logging.warning(
                "config.py not found or HF_TOKEN not defined. Diarization may not work."
                " Please create a config.py file with HF_TOKEN = 'YOUR_HUGGINGFACE_TOKEN'"
            )

    audio_input = args.audio_input

    if os.path.isdir(audio_input):
        audio_files = [f for f in os.listdir(audio_input) if f.endswith(('.mp3', '.wav', '.aac', '.m4a'))]
        if not audio_files:
            print("No audio files found in the specified directory.")
        else:
            print("\nAvailable audio files:")
            for i, filename in enumerate(audio_files):
                print(f"{i + 1}. {filename}")
            files_to_process = [os.path.join(audio_input, f) for f in audio_files]

            for audio_file_path in files_to_process:
                print(f"\n--- Processing: {audio_file_path} ---")
                process_single_audio(audio_file_path, os.path.join(output_dir_base, "chunks"), hf_token,
                                     args.skip_diarization, args.whisper_model, args.language,
                                     args.min_speakers, args.max_speakers, args.use_vad,
                                     args.vad_method, args.verbose, args.plot_word_alignment,
                                     args.detect_disfluencies, args.no_rename_speakers)

    elif os.path.isfile(audio_input):
        audio_file = audio_input
        process_single_audio(audio_file, os.path.join(output_dir_base, "chunks"), hf_token,
                             args.skip_diarization, args.whisper_model, args.language,
                             args.min_speakers, args.max_speakers, args.use_vad,
                             args.vad_method, args.verbose, args.plot_word_alignment,
                             args.detect_disfluencies, args.no_rename_speakers)
    else:
        print("Invalid input path.  Please provide a valid audio file or directory.")
if __name__ == "__main__":
    main()



usage: ipykernel_launcher.py [-h] [--output_dir OUTPUT_DIR]
                             [--hf_token HF_TOKEN] [--skip_diarization]
                             [--whisper_model {tiny,base,small,medium,large}]
                             [--language LANGUAGE]
                             [--min_speakers MIN_SPEAKERS]
                             [--max_speakers MAX_SPEAKERS] [--use_vad]
                             [--vad_method {silero,silero:3.1,auditok}]
                             [--verbose] [--plot_word_alignment]
                             [--detect_disfluencies] [--no_rename_speakers]
                             audio_input
ipykernel_launcher.py: error: the following arguments are required: audio_input


SystemExit: 2