In [1]:
import os
import torch
import librosa
import soundfile as sf
import warnings
import numpy
import utils
warnings.filterwarnings("ignore", category=FutureWarning, module="whisper")
import faster_whisper

# Helper Functions

In [3]:
punct_model_langs = [
    "en",
    "fr",
    "de",
    "es",
    "it",
    "nl",
    "pt",
    "bg",
    "pl",
    "cs",
    "sk",
    "sl",
]

LANGUAGES = {
    "en": "english",
    "zh": "chinese",
    "de": "german",
    "es": "spanish",
    "ru": "russian",
    "ko": "korean",
    "fr": "french",
    "ja": "japanese",
    "pt": "portuguese",
    "tr": "turkish",
    "pl": "polish",
    "ca": "catalan",
    "nl": "dutch",
    "ar": "arabic",
    "sv": "swedish",
    "it": "italian",
    "id": "indonesian",
    "hi": "hindi",
    "fi": "finnish",
    "vi": "vietnamese",
    "he": "hebrew",
    "uk": "ukrainian",
    "el": "greek",
    "ms": "malay",
    "cs": "czech",
    "ro": "romanian",
    "da": "danish",
    "hu": "hungarian",
    "ta": "tamil",
    "no": "norwegian",
    "th": "thai",
    "ur": "urdu",
    "hr": "croatian",
    "bg": "bulgarian",
    "lt": "lithuanian",
    "la": "latin",
    "mi": "maori",
    "ml": "malayalam",
    "cy": "welsh",
    "sk": "slovak",
    "te": "telugu",
    "fa": "persian",
    "lv": "latvian",
    "bn": "bengali",
    "sr": "serbian",
    "az": "azerbaijani",
    "sl": "slovenian",
    "kn": "kannada",
    "et": "estonian",
    "mk": "macedonian",
    "br": "breton",
    "eu": "basque",
    "is": "icelandic",
    "hy": "armenian",
    "ne": "nepali",
    "mn": "mongolian",
    "bs": "bosnian",
    "kk": "kazakh",
    "sq": "albanian",
    "sw": "swahili",
    "gl": "galician",
    "mr": "marathi",
    "pa": "punjabi",
    "si": "sinhala",
    "km": "khmer",
    "sn": "shona",
    "yo": "yoruba",
    "so": "somali",
    "af": "afrikaans",
    "oc": "occitan",
    "ka": "georgian",
    "be": "belarusian",
    "tg": "tajik",
    "sd": "sindhi",
    "gu": "gujarati",
    "am": "amharic",
    "yi": "yiddish",
    "lo": "lao",
    "uz": "uzbek",
    "fo": "faroese",
    "ht": "haitian creole",
    "ps": "pashto",
    "tk": "turkmen",
    "nn": "nynorsk",
    "mt": "maltese",
    "sa": "sanskrit",
    "lb": "luxembourgish",
    "my": "myanmar",
    "bo": "tibetan",
    "tl": "tagalog",
    "mg": "malagasy",
    "as": "assamese",
    "tt": "tatar",
    "haw": "hawaiian",
    "ln": "lingala",
    "ha": "hausa",
    "ba": "bashkir",
    "jw": "javanese",
    "su": "sundanese",
    "yue": "cantonese",
}

# language code lookup by name, with a few language aliases
TO_LANGUAGE_CODE = {
    **{language: code for code, language in LANGUAGES.items()},
    "burmese": "my",
    "valencian": "ca",
    "flemish": "nl",
    "haitian": "ht",
    "letzeburgesch": "lb",
    "pushto": "ps",
    "panjabi": "pa",
    "moldavian": "ro",
    "moldovan": "ro",
    "sinhalese": "si",
    "castilian": "es",
}


langs_to_iso = {
    "af": "afr",
    "am": "amh",
    "ar": "ara",
    "as": "asm",
    "az": "aze",
    "ba": "bak",
    "be": "bel",
    "bg": "bul",
    "bn": "ben",
    "bo": "tib",
    "br": "bre",
    "bs": "bos",
    "ca": "cat",
    "cs": "cze",
    "cy": "wel",
    "da": "dan",
    "de": "ger",
    "el": "gre",
    "en": "eng",
    "es": "spa",
    "et": "est",
    "eu": "baq",
    "fa": "per",
    "fi": "fin",
    "fo": "fao",
    "fr": "fre",
    "gl": "glg",
    "gu": "guj",
    "ha": "hau",
    "haw": "haw",
    "he": "heb",
    "hi": "hin",
    "hr": "hrv",
    "ht": "hat",
    "hu": "hun",
    "hy": "arm",
    "id": "ind",
    "is": "ice",
    "it": "ita",
    "ja": "jpn",
    "jw": "jav",
    "ka": "geo",
    "kk": "kaz",
    "km": "khm",
    "kn": "kan",
    "ko": "kor",
    "la": "lat",
    "lb": "ltz",
    "ln": "lin",
    "lo": "lao",
    "lt": "lit",
    "lv": "lav",
    "mg": "mlg",
    "mi": "mao",
    "mk": "mac",
    "ml": "mal",
    "mn": "mon",
    "mr": "mar",
    "ms": "may",
    "mt": "mlt",
    "my": "bur",
    "ne": "nep",
    "nl": "dut",
    "nn": "nno",
    "no": "nor",
    "oc": "oci",
    "pa": "pan",
    "pl": "pol",
    "ps": "pus",
    "pt": "por",
    "ro": "rum",
    "ru": "rus",
    "sa": "san",
    "sd": "snd",
    "si": "sin",
    "sk": "slo",
    "sl": "slv",
    "sn": "sna",
    "so": "som",
    "sq": "alb",
    "sr": "srp",
    "su": "sun",
    "sv": "swe",
    "sw": "swa",
    "ta": "tam",
    "te": "tel",
    "tg": "tgk",
    "th": "tha",
    "tk": "tuk",
    "tl": "tgl",
    "tr": "tur",
    "tt": "tat",
    "uk": "ukr",
    "ur": "urd",
    "uz": "uzb",
    "vi": "vie",
    "yi": "yid",
    "yo": "yor",
    "yue": "yue",
    "zh": "chi",
}


whisper_langs = sorted(LANGUAGES.keys()) + sorted(
    [k.title() for k in TO_LANGUAGE_CODE.keys()]
)


def create_config(output_dir):
    DOMAIN_TYPE = "telephonic"  # Can be meeting, telephonic, or general based on domain type of the audio file
    CONFIG_FILE_NAME = f"diar_infer_{DOMAIN_TYPE}.yaml"
    CONFIG_URL = f"https://raw.githubusercontent.com/NVIDIA/NeMo/main/examples/speaker_tasks/diarization/conf/inference/{CONFIG_FILE_NAME}"
    MODEL_CONFIG = os.path.join(output_dir, CONFIG_FILE_NAME)
    if not os.path.exists(MODEL_CONFIG):
        MODEL_CONFIG = wget.download(CONFIG_URL, output_dir)

    config = OmegaConf.load(MODEL_CONFIG)

    data_dir = os.path.join(output_dir, "data")
    os.makedirs(data_dir, exist_ok=True)

    meta = {
        "audio_filepath": os.path.join(output_dir, "mono_file.wav"),
        "offset": 0,
        "duration": None,
        "label": "infer",
        "text": "-",
        "rttm_filepath": None,
        "uem_filepath": None,
    }
    with open(os.path.join(data_dir, "input_manifest.json"), "w") as fp:
        json.dump(meta, fp)
        fp.write("\n")

    pretrained_vad = "vad_multilingual_marblenet"
    pretrained_speaker_model = "titanet_large"
    config.num_workers = 0  # Workaround for multiprocessing hanging with ipython issue
    config.diarizer.manifest_filepath = os.path.join(data_dir, "input_manifest.json")
    config.diarizer.out_dir = (
        output_dir  # Directory to store intermediate files and prediction outputs
    )

    config.diarizer.speaker_embeddings.model_path = pretrained_speaker_model
    config.diarizer.oracle_vad = (
        False  # compute VAD provided with model_path to vad config
    )
    config.diarizer.clustering.parameters.oracle_num_speakers = False

    # Here, we use our in-house pretrained NeMo VAD model
    config.diarizer.vad.model_path = pretrained_vad
    config.diarizer.vad.parameters.onset = 0.8
    config.diarizer.vad.parameters.offset = 0.6
    config.diarizer.vad.parameters.pad_offset = -0.05
    config.diarizer.msdd_model.model_path = (
        "diar_msdd_telephonic"  # Telephonic speaker diarization model
    )

    return config


def get_word_ts_anchor(s, e, option="start"):
    if option == "end":
        return e
    elif option == "mid":
        return (s + e) / 2
    return s

def get_audio_extraction(input_file, output_file, start_min, end_min):
    audio = AudioSegment.from_file(input_file, format="wav")
    start_min *= 60000
    end_min *= 60000
    ten_minutes = audio[start_min:end_min]  # Pydub uses milliseconds
    ten_minutes.export(output_file, format="wav")


def get_words_speaker_mapping(wrd_ts, spk_ts, word_anchor_option="start"):
    s, e, sp = spk_ts[0]
    wrd_pos, turn_idx = 0, 0
    wrd_spk_mapping = []
    for wrd_dict in wrd_ts:
        ws, we, wrd = (
            int(wrd_dict["start"] * 1000),
            int(wrd_dict["end"] * 1000),
            wrd_dict["text"],
        )
        wrd_pos = get_word_ts_anchor(ws, we, word_anchor_option)
        while wrd_pos > float(e):
            turn_idx += 1
            turn_idx = min(turn_idx, len(spk_ts) - 1)
            s, e, sp = spk_ts[turn_idx]
            if turn_idx == len(spk_ts) - 1:
                e = get_word_ts_anchor(ws, we, option="end")
        wrd_spk_mapping.append(
            {"word": wrd, "start_time": ws, "end_time": we, "speaker": sp}
        )
    return wrd_spk_mapping


sentence_ending_punctuations = ".?!"


def get_first_word_idx_of_sentence(word_idx, word_list, speaker_list, max_words):
    is_word_sentence_end = (
        lambda x: x >= 0 and word_list[x][-1] in sentence_ending_punctuations
    )
    left_idx = word_idx
    while (
        left_idx > 0
        and word_idx - left_idx < max_words
        and speaker_list[left_idx - 1] == speaker_list[left_idx]
        and not is_word_sentence_end(left_idx - 1)
    ):
        left_idx -= 1

    return left_idx if left_idx == 0 or is_word_sentence_end(left_idx - 1) else -1


def get_last_word_idx_of_sentence(word_idx, word_list, max_words):
    is_word_sentence_end = (
        lambda x: x >= 0 and word_list[x][-1] in sentence_ending_punctuations
    )
    right_idx = word_idx
    while (
        right_idx < len(word_list) - 1
        and right_idx - word_idx < max_words
        and not is_word_sentence_end(right_idx)
    ):
        right_idx += 1

    return (
        right_idx
        if right_idx == len(word_list) - 1 or is_word_sentence_end(right_idx)
        else -1
    )


def get_realigned_ws_mapping_with_punctuation(
    word_speaker_mapping, max_words_in_sentence=50
):
    is_word_sentence_end = (
        lambda x: x >= 0
        and word_speaker_mapping[x]["word"][-1] in sentence_ending_punctuations
    )
    wsp_len = len(word_speaker_mapping)

    words_list, speaker_list = [], []
    for k, line_dict in enumerate(word_speaker_mapping):
        word, speaker = line_dict["word"], line_dict["speaker"]
        words_list.append(word)
        speaker_list.append(speaker)

    k = 0
    while k < len(word_speaker_mapping):
        line_dict = word_speaker_mapping[k]
        if (
            k < wsp_len - 1
            and speaker_list[k] != speaker_list[k + 1]
            and not is_word_sentence_end(k)
        ):
            left_idx = get_first_word_idx_of_sentence(
                k, words_list, speaker_list, max_words_in_sentence
            )
            right_idx = (
                get_last_word_idx_of_sentence(
                    k, words_list, max_words_in_sentence - k + left_idx - 1
                )
                if left_idx > -1
                else -1
            )
            if min(left_idx, right_idx) == -1:
                k += 1
                continue

            spk_labels = speaker_list[left_idx : right_idx + 1]
            mod_speaker = max(set(spk_labels), key=spk_labels.count)
            if spk_labels.count(mod_speaker) < len(spk_labels) // 2:
                k += 1
                continue

            speaker_list[left_idx : right_idx + 1] = [mod_speaker] * (
                right_idx - left_idx + 1
            )
            k = right_idx

        k += 1

    k, realigned_list = 0, []
    while k < len(word_speaker_mapping):
        line_dict = word_speaker_mapping[k].copy()
        line_dict["speaker"] = speaker_list[k]
        realigned_list.append(line_dict)
        k += 1

    return realigned_list


def get_sentences_speaker_mapping(word_speaker_mapping, spk_ts):
    sentence_checker = nltk.tokenize.PunktSentenceTokenizer().text_contains_sentbreak
    s, e, spk = spk_ts[0]
    prev_spk = spk

    snts = []
    snt = {"speaker": f"Speaker {spk}", "start_time": s, "end_time": e, "text": ""}

    for wrd_dict in word_speaker_mapping:
        wrd, spk = wrd_dict["word"], wrd_dict["speaker"]
        s, e = wrd_dict["start_time"], wrd_dict["end_time"]
        if spk != prev_spk or sentence_checker(snt["text"] + " " + wrd):
            snts.append(snt)
            snt = {
                "speaker": f"Speaker {spk}",
                "start_time": s,
                "end_time": e,
                "text": "",
            }
        else:
            snt["end_time"] = e
        snt["text"] += wrd + " "
        prev_spk = spk

    snts.append(snt)
    return snts


def get_speaker_aware_transcript(sentences_speaker_mapping, f):
    previous_speaker = sentences_speaker_mapping[0]["speaker"]
    f.write(f"{previous_speaker}: ")

    for sentence_dict in sentences_speaker_mapping:
        speaker = sentence_dict["speaker"]
        sentence = sentence_dict["text"]

        # If this speaker doesn't match the previous one, start a new paragraph
        if speaker != previous_speaker:
            f.write(f"\n\n{speaker}: ")
            previous_speaker = speaker

        # No matter what, write the current sentence
        f.write(sentence + " ")


def format_timestamp(
    milliseconds: float, always_include_hours: bool = False, decimal_marker: str = "."
):
    assert milliseconds >= 0, "non-negative timestamp expected"

    hours = milliseconds // 3_600_000
    milliseconds -= hours * 3_600_000

    minutes = milliseconds // 60_000
    milliseconds -= minutes * 60_000

    seconds = milliseconds // 1_000
    milliseconds -= seconds * 1_000

    hours_marker = f"{hours:02d}:" if always_include_hours or hours > 0 else ""
    return (
        f"{hours_marker}{minutes:02d}:{seconds:02d}{decimal_marker}{milliseconds:03d}"
    )


def write_srt(transcript, file):
    """
    Write a transcript to a file in SRT format.

    """
    for i, segment in enumerate(transcript, start=1):
        # write srt lines
        print(
            f"{i}\n"
            f"{format_timestamp(segment['start_time'], always_include_hours=True, decimal_marker=',')} --> "
            f"{format_timestamp(segment['end_time'], always_include_hours=True, decimal_marker=',')}\n"
            f"{segment['speaker']}: {segment['text'].strip().replace('-->', '->')}\n",
            file=file,
            flush=True,
        )


def find_numeral_symbol_tokens(tokenizer):
    numeral_symbol_tokens = [
        -1,
    ]
    for token, token_id in tokenizer.get_vocab().items():
        has_numeral_symbol = any(c in "0123456789%$£" for c in token)
        if has_numeral_symbol:
            numeral_symbol_tokens.append(token_id)
    return numeral_symbol_tokens


def _get_next_start_timestamp(word_timestamps, current_word_index, final_timestamp):
    # if current word is the last word
    if current_word_index == len(word_timestamps) - 1:
        return word_timestamps[current_word_index]["start"]

    next_word_index = current_word_index + 1
    while current_word_index < len(word_timestamps) - 1:
        if word_timestamps[next_word_index].get("start") is None:
            # if next word doesn't have a start timestamp
            # merge it with the current word and delete it
            word_timestamps[current_word_index]["word"] += (
                " " + word_timestamps[next_word_index]["word"]
            )

            word_timestamps[next_word_index]["word"] = None
            next_word_index += 1
            if next_word_index == len(word_timestamps):
                return final_timestamp

        else:
            return word_timestamps[next_word_index]["start"]


def filter_missing_timestamps(
    word_timestamps, initial_timestamp=0, final_timestamp=None
):
    # handle the first and last word
    if word_timestamps[0].get("start") is None:
        word_timestamps[0]["start"] = (
            initial_timestamp if initial_timestamp is not None else 0
        )
        word_timestamps[0]["end"] = _get_next_start_timestamp(
            word_timestamps, 0, final_timestamp
        )

    result = [
        word_timestamps[0],
    ]

    for i, ws in enumerate(word_timestamps[1:], start=1):
        # if ws doesn't have a start and end
        # use the previous end as start and next start as end
        if ws.get("start") is None and ws.get("word") is not None:
            ws["start"] = word_timestamps[i - 1]["end"]
            ws["end"] = _get_next_start_timestamp(word_timestamps, i, final_timestamp)

        if ws["word"] is not None:
            result.append(ws)
    return result


def cleanup(path: str):
    """path could either be relative or absolute."""
    # check if file or directory exists
    if os.path.isfile(path) or os.path.islink(path):
        # remove file
        os.remove(path)
    elif os.path.isdir(path):
        # remove directory and all its content
        shutil.rmtree(path)
    else:
        raise ValueError("Path {} is not a file or dir.".format(path))


def process_language_arg(language: str, model_name: str):
    """
    Process the language argument to make sure it's valid and convert language names to language codes.
    """
    if language is not None:
        language = language.lower()
    if language not in LANGUAGES:
        if language in TO_LANGUAGE_CODE:
            language = TO_LANGUAGE_CODE[language]
        else:
            raise ValueError(f"Unsupported language: {language}")

    if model_name.endswith(".en") and language != "en":
        if language is not None:
            logging.warning(
                f"{model_name} is an English-only model but received '{language}'; using English instead."
            )
        language = "en"
    return language

# Transcriptions

In [4]:
is_fast = True
is_configured = True
enable_stemming = False

In [7]:


# or run on GPU with INT8
# model = WhisperModel(model_size, device="cuda", compute_type="int8_float16")
# or run on CPU with INT8
# model = WhisperModel(model_size, device="cpu", compute_type="int8")

# print("Detected language '%s' with probability %f" % (info.language, info.language_probability))
# print(segments)
# for segment in segments:
#     print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))

In [None]:

# num_list = ["1554","1713","1731","1738","1833","1944"]
num_list = ["1554"]

model_name = "large-v3"
device_type = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device_type)

compute_type = "float16"
suppress_numerals = True
whisper_model = faster_whisper.WhisperModel(
    model_name, device=device_type, compute_type=compute_type
)
whisper_pipeline = faster_whisper.BatchedInferencePipeline(whisper_model)
suppress_tokens = (
    find_numeral_symbol_tokens(whisper_model.hf_tokenizer)
    if suppress_numerals
    else [-1]
)
language = None  # autodetect language
batch_size = 8
is_fast = True
config_path = ""
if is_configured:
    config_path = "C"
if model_name == "large-v3" and is_fast:
    path_name = "largeV3F"
elif model_name == "large-v3":
    path_name = "largeV3C"
elif model_name == "large-v3":
    path_name = "largeV3"
elif model_name == "base":
    path_name = "base"

for num in num_list:
    #audio_path = f'./Dataset/Audio/{num}.wav'
    audio_path = "first_1_minutes.wav"

    if enable_stemming:
        # Isolate vocals from the rest of the audio
        return_code = os.system(
            f'python -m demucs.separate -n htdemucs --two-stems=vocals "{audio_path}" -o "temp_outputs" --device "{device_type}"'
        )
        if return_code != 0:
            print("Source splitting failed, using original audio file.")
            vocal_target = audio_path
        else:
            vocal_target = os.path.join(
                "temp_outputs",
                "htdemucs",
                os.path.splitext(os.path.basename(audio_path))[0],
                "vocals.wav",
            )
    else:
        vocal_target = audio_path

    audio_waveform = faster_whisper.decode_audio(vocal_target)
    # if sample_rate != 16000:
    #     print(f"Resampling from {sample_rate} Hz to 16000 Hz...")
    #     speech, sample_rate = librosa.load(audio_file, sr=None)
    #     speech = librosa.resample(speech, orig_sr=sample_rate, target_sr=16000)
    #     sample_rate = 16000
    #     sf.write(audio_file, speech, sample_rate)

    if is_configured:
        transcript_segments, info = whisper_pipeline.transcribe(
            audio_waveform,
            language=language,
            suppress_tokens=suppress_tokens,
            beam_size=1,
            temperature=0.0,
            without_timestamps=False,
            vad_filter=True,
        )
    else:
        transcript_segments, info = whisper_pipeline.transcribe(
            audio_waveform,
            language=language,
            suppress_tokens=suppress_tokens,
            without_timestamps=False,
            vad_filter=True,
        )
    result = []
    for seg in transcript_segments:
        result.append({"text": seg.text,"start":seg.start, "end": seg.end})
        print(str(seg.start) + "-" + str(seg.end) + ": "+ seg.text)
        

        
    # full_transcript = "".join(segment.text for segment in transcript_segments)
    # file_path = f"./output/{path_name}{config_path}_{num}.txt"
    # output_path = f"./output/{path_name}{config_path}_{num}_L.txt"
    # with open(file_path, "w", encoding="utf-8") as file:
    #     file.write(full_transcript.lower())
    # utils.process_text(file_path,output_path)



Using device: cuda


model.bin:   3%|3         | 105M/3.09G [00:00<?, ?B/s]

Error while downloading from https://cas-bridge.xethub.hf.co/xet-bridge-us/655f2075d3934dc40213a799/982bcaa78dd99dc42935ee11f021a876aaf08c11171817ad8a4d6bc1c881b3a5?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Content-Sha256=UNSIGNED-PAYLOAD&X-Amz-Credential=cas%2F20250223%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250223T135840Z&X-Amz-Expires=900&X-Amz-Signature=0ad5c7d266c000161158cbcf88d7545d1f404130670eb15fe4f51f1519add7e7&X-Amz-SignedHeaders=host&X-Xet-Cas-Uid=66c6007877ad31f2c074a29a&response-content-disposition=inline%3B+filename*%3DUTF-8%27%27model.bin%3B+filename%3D%22model.bin%22%3B&response-content-type=application%2Foctet-stream&x-id=GetObject&Expires=1740322720&Policy=eyJTdGF0ZW1lbnQiOlt7IkNvbmRpdGlvbiI6eyJEYXRlTGVzc1RoYW4iOnsiQVdTOkVwb2NoVGltZSI6MTc0MDMyMjcyMH19LCJSZXNvdXJjZSI6Imh0dHBzOi8vY2FzLWJyaWRnZS54ZXRodWIuaGYuY28veGV0LWJyaWRnZS11cy82NTVmMjA3NWQzOTM0ZGM0MDIxM2E3OTkvOTgyYmNhYTc4ZGQ5OWRjNDI5MzVlZTExZjAyMWE4NzZhYWYwOGMxMTE3MTgxN2FkOGE0ZDZiYzFjODgxYjNhNSoifV19&Signature=kGA

model.bin:  15%|#4        | 461M/3.09G [00:00<?, ?B/s]

Error while downloading from https://cas-bridge.xethub.hf.co/xet-bridge-us/655f2075d3934dc40213a799/982bcaa78dd99dc42935ee11f021a876aaf08c11171817ad8a4d6bc1c881b3a5?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Content-Sha256=UNSIGNED-PAYLOAD&X-Amz-Credential=cas%2F20250223%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20250223T135840Z&X-Amz-Expires=900&X-Amz-Signature=0ad5c7d266c000161158cbcf88d7545d1f404130670eb15fe4f51f1519add7e7&X-Amz-SignedHeaders=host&X-Xet-Cas-Uid=66c6007877ad31f2c074a29a&response-content-disposition=inline%3B+filename*%3DUTF-8%27%27model.bin%3B+filename%3D%22model.bin%22%3B&response-content-type=application%2Foctet-stream&x-id=GetObject&Expires=1740322720&Policy=eyJTdGF0ZW1lbnQiOlt7IkNvbmRpdGlvbiI6eyJEYXRlTGVzc1RoYW4iOnsiQVdTOkVwb2NoVGltZSI6MTc0MDMyMjcyMH19LCJSZXNvdXJjZSI6Imh0dHBzOi8vY2FzLWJyaWRnZS54ZXRodWIuaGYuY28veGV0LWJyaWRnZS11cy82NTVmMjA3NWQzOTM0ZGM0MDIxM2E3OTkvOTgyYmNhYTc4ZGQ5OWRjNDI5MzVlZTExZjAyMWE4NzZhYWYwOGMxMTE3MTgxN2FkOGE0ZDZiYzFjODgxYjNhNSoifV19&Signature=kGA

model.bin:  15%|#5        | 472M/3.09G [00:00<?, ?B/s]

In [7]:
del whisper_model, whisper_pipeline
torch.cuda.empty_cache()

In [8]:
import whisperx
import gc
device = "cuda"
audio_path = "first_1_minutes.wav"
audio = whisperx.load_audio(audio_path)

# 2. Align whisper output
model_a, metadata = whisperx.load_align_model(language_code="en", device=device)
result = whisperx.align(result, model_a, metadata, audio, device, return_char_alignments=False)

# 3. Assign speaker labels
diarize_model = whisperx.DiarizationPipeline(use_auth_token="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", device=device)
diarize_segments = diarize_model(audio, num_speakers=2)  # Or adjust min/max speakers
result = whisperx.assign_word_speakers(diarize_segments, result)


S0 = ""
S1 = ""
# 4. Print speaker names with their utterances
for segment in result["segments"]:
    speaker_id = segment["speaker"]
    utterance = segment["text"]
    if speaker_id == "SPEAKER_00":
        S0 += utterance + " "
    else:
        S1 += utterance + " "
    print(f"Speaker {speaker_id}: {utterance}")

INFO:speechbrain.utils.quirks:Applied quirks (see `speechbrain.utils.quirks`): [disable_jit_profiling, allow_tf32]
INFO:speechbrain.utils.quirks:Excluded quirks specified by the `SB_DISABLE_QUIRKS` environment (comma-separated list): []
It can be re-enabled by calling
   >>> import torch
   >>> torch.backends.cuda.matmul.allow_tf32 = True
   >>> torch.backends.cudnn.allow_tf32 = True
See https://github.com/pyannote/pyannote-audio/issues/1370 for more details.

  std = sequences.std(dim=-1, correction=1)


Speaker SPEAKER_01:  So I'm just gonna be asking you to do some talking.
Speaker SPEAKER_00:  Okay.
Speaker SPEAKER_01:  So how do you think your speeches these days?
Speaker SPEAKER_00:  It's good, but...
Speaker SPEAKER_00:  will be better.
Speaker SPEAKER_00:  I can...
Speaker SPEAKER_00:  little... I can read a little bit...
Speaker SPEAKER_00:  I have trouble with uh...
Speaker SPEAKER_00:  and then then...
Speaker SPEAKER_00:  and then...
Speaker SPEAKER_00:  hold that.
Speaker SPEAKER_01:  Do you remember when you had your stroke?
Speaker SPEAKER_00:  Yeah.
Speaker SPEAKER_01:  Could you tell me about it?
Speaker SPEAKER_00:  Oh gosh.
Speaker SPEAKER_00:  Well, I got...
Speaker SPEAKER_00:  I got up to check the...


In [None]:
del model_a, diarize_model
torch.cuda.empty_cache()

NameError: name 'model_a' is not defined

# Diarization

# Wav2Vec

In [1]:
from transformers import pipeline
#https://github.com/huggingface/transformers/blob/main/examples/research_projects/wav2vec2/FINE_TUNE_XLSR_WAV2VEC2.md
#https://github.com/crazycloud/mispronunciation-detection-diagnosis-wav2vec2-and-llm/blob/main/notebooks/mispronunciation-detection.ipynb
#https://github.com/NeuralVox/OpenPhonemizer
pipe = pipeline(model="mrrubino/wav2vec2-large-xlsr-53-l2-arctic-phoneme")
transcription = pipe("1554_chunks/chunk_1.wav")["text"]
print(transcription)

Device set to use cuda:0


soʊ aɪm bɪskɪn ʌ bi æskɪŋ ju sɪtisɪn hɑki ʊkɪsɛw hæv ju θɪŋk jʊ spit͡ʃɪz ðis deɪɪts ɡɛ beɪ ʌm æ wu bi bɛtɚ m  aɪ kʌn  lɪtʌl aɪ kʌn bɹid ʌ lɪtolbʌ   aɪ hæv tɹʌvʌl wɪd ʌ  ʌntnʌbæ   ju ɚba jɚ ɹɛdi hæd ɹistɹʌ j hæ ænd kʊd ju tɛl ju bɑtɪoʊwʌnwɛl aɪ ɡɑ m aɪ ɡɑtʌ tɪ t͡ʃɛk dʌ wɑndɚi æn m aɪ ɡɑt ʌbaʊ ðeɪɹ m ɪn ðʌ dɔɹʌweɪ ɛvðʌ keʃʌn æn aɪ doʊ ɹimɛmbɚ æ aɪ ɡoʊ tu veɪ ˌʌndɚbɛlʌnæ sɔl aɪ kʌn ɹɪmɛmbɚ n ʌn wʌt ʌbaʊt jʊ fɹɚ spɛnɚei æftɚ ð stɹʌp aɪ wʌz skɛɹn sowʌt baʊt jʊɹ ɹɪkʌvɚi hwʌt kaɪndz ʌv θɪŋz hæv ju dʌn tɪ t͡ʃɹaɪtɪɡ æt bɛtɚ sɪn ɚstɹʌæm aɪ m ɛksɚsaɪz ʌ aɪp taɪm   ʌ wik nd m aɪ hæv spt͡ʃ ʌn ʌm spɚn kɚsmʌspɹɪk ʌ stɑɹtɪŋ ʌk nɛkst  tu tusteɪwɛn wɛnsdɛɡɔ ɪn weɪk æm ʌn ʌʌ ʌ ʌs  dɑktɚt ʌpɪ ʌn nɔ tʌɛw æʊt taɪ n  æksɚsˌaɪzɪŋ aɪ ðɛɹ ɛni ʌðɚ t͡ʃeɪnd͡ʒɪz ɪn jɚ deɪl deɪlˌaɪɑ wˌaɪ aɪ aɪ keɪŋ wɚk ʌnaɪ wɑtʃ mʌ ɡɹæm sʌɪn  n ætsɛaɪ ʌn d͡ɹaɪv naʊ ɪʌn θki ʌkhæsɛlaɪmn kʌn æsk jɪ ju ju ʌ kʌbliθŋ s  tɔki ʌ kʌn æ ænd tɑkʌz bæd ʒɪsz jukæm ʌbaʊt it͡ʃ wˌʌn kʌz ju leɪtɚ dɛlʌ ɹɪkɹˌɪt͡ʃ ʌk ɪ ʌ sɛwθɪŋkɪŋ bæk kæn ju tɛl m jʊɚ 

In [1]:
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC
import torch
import torchaudio

# Load model and processor
model_name = "vitouphy/wav2vec2-xls-r-300m-phoneme"
processor = Wav2Vec2Processor.from_pretrained(model_name)
model = Wav2Vec2ForCTC.from_pretrained(model_name)


preprocessor_config.json:   0%|          | 0.00/214 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/294 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/410 [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/23.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/309 [00:00<?, ?B/s]

model.safetensors:  20%|#9        | 252M/1.26G [00:00<?, ?B/s]

In [7]:
from transformers import pipeline

# Load the model
pipe = pipeline(model="vitouphy/wav2vec2-xls-r-300m-timit-phoneme")

# Process raw audio
output = pipe("1554_chunks/chunk_1.wav", chunk_length_s=10, stride_length_s=(4, 2))

# Print the transcription
print(output)


Device set to use cuda:0


{'text': 'soʊaɪm ʤɪs gɪnɪ biæs kɪŋju tɪ dusəm tɑ kiŋoʊ keɪsoʊ haʊɾiuθɪŋ k jɝs pi ʧɪɾɪz ðiz deɪzhɪ t s gʊ bɛ əmw l bi bɛɾɝəmaɪ kɪnhlɪɾl  aɪ k n b ɹi ð əlɪɾ l bɛ ɑmɑmaɪhæv ʧɹə b lwɪθə inɛnθəhɑvɑdɪjuɹɪmɛm bɝwɛn juhæ d jɝs t ɹoʊ kjæ h əmkɪ d u tɛl miɪ baʊɾɪ oʊ kɑʃɪ wɛl aɪ gɑ əm  aɪgɑɾə  p tu ʧɛ ðə lɑn ʤɝi æn   m  aɪ  gɑɾə baʊ  ðeɪɝ  əm ɪnðɪ dɑɹ weɪəvəðɪ kɪ ʧɪn ænaɪ doʊn ɹimɛm bɝ aɪ  goʊ tuviɪn dɝ bɪl ɛnðæ sɑlaɪ kɪn ɹɝmɛm bɝəm wəɾɪ baʊ jɝfɝs mɛmɝiɪz æf tɝðɪs d ɹə kaɪwʊs kɛɝ djæəmə keɪsoʊhwəɾɪ baʊ jɝɹɝ kəvɝi wə kaɪn zɪvθɪŋzhɪv ju dən tɪ t ɹaɪɾɪ gɛ bɛɾɝsɪn s jus t ɹoʊ p thəm aɪmhəmɛ  sɝsaɪz əmffaɪv taɪm zəhmnəwi kæn dəm  aɪhæv s pi ʧəm   əms p ɹɪŋ  k ɹɪs mɪs b ɹeɪ kə  s tɑɹɾiŋ ə pnnɛ k s tmtuz tuz deɪwɪn wɪn z deɪ ðfɑləɪŋwi khæn dɑɑaɪɾʊ ələnoʊhə t sɪ aɪhdɑ tɝzə pɔɪn mɪn swəhɪn t ɹɛəzhsoʊ   saɪɾɪ ɑf ɛ k sɝsaɪziŋɑɹðɛɹ ɛniəðɝ ʧin ʤɪzɪn jɝ deɪɾɪ deɪwaɪfoʊaɪ   aɪaɪ kin wɝ khaɪwɑ ʧmaɪ g ɹæn sənɛnɛnðæ sɪ h aɪ kɪn ʤaɪv naʊ ɑjɪ sɛllə  sə g ɹii  oʊ keɪssoʊnaʊm gɪnə æs k juɾɪ ʤuɪ kə p ləðɝθɪŋz wɪθ tɑ ki

In [None]:
# Phonemize Whisper result before alignment
# use whiper phonemized data for training n-gram?!

In [None]:
import batchalign as ba

In [4]:
text = " so i'm just gonna be asking you to do some talking. okay. so how do you think your speech is these days? it's good, but... it will be better. i can... little... i can read a little, but... i have trouble with and and the and all that. do you remember when you had your stroke? yeah. could you tell me about it? oh gosh. well i got i got up to check the"
nlp = ba.BatchalignPipeline.new("asr,morphosyntax", lang="eng", num_speakers=2)
doc = ba.Document.new(media_path="first_1_minutes.wav", lang="eng")
doc = nlp(doc) # this is equivalent to nlp("audio.mp3"), we will make the initial doc for you

first_word_pos = doc[0][0].morphology
first_word_time = doc[0][0].time
first_utterance_time = doc[0].alignment

generation_config.json:   0%|          | 0.00/4.29k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/283k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/836k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.48M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/494k [00:00<?, ?B/s]

normalizer.json:   0%|          | 0.00/52.7k [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/34.6k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/2.19k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/2.28k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/6.17G [00:00<?, ?B/s]

KeyboardInterrupt: 