In [None]:
"""
You can run either this notebook locally (if you have all the dependencies and a GPU) or on Google Colab.

Instructions for setting up Colab are as follows:
1. Open a new Python 3 notebook.
2. Import this notebook from GitHub (File -> Upload Notebook -> "GITHUB" tab -> copy/paste GitHub URL)
3. Connect to an instance with a GPU (Runtime -> Change runtime type -> select "GPU" for hardware accelerator)
4. Run this cell to set up dependencies.
"""
# If you're using Google Colab and not running locally, run this cell.

## Install dependencies
!pip install wget
!apt-get install sox libsndfile1 ffmpeg
!pip install text-unidecode

# ## Install NeMo
BRANCH = 'r2.0.0rc0'
!python -m pip install git+https://github.com/NVIDIA/NeMo.git@$BRANCH#egg=nemo_toolkit[asr]

## Install TorchAudio
!pip install torchaudio -f https://download.pytorch.org/whl/torch_stable.html

In [None]:
!pip install pytube

Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl (57 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/57.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pytube
Successfully installed pytube-15.0.0


# Download using a link (url)

In [None]:
from pytube import YouTube
import os
import subprocess

def download_video(video_url):
    yt = YouTube(video_url)
    # Get the audio stream instead of the video stream
    audio_stream = yt.streams.filter(only_audio=True).first()
    if audio_stream is None:
        raise Exception("No audio stream found")
    audio_file = audio_stream.download()
    return audio_file

def convert_to_wav(audio_file):
    print("Converting to WAV")
    wav_file = os.path.splitext(audio_file)[0] + '.wav'
    command = ['ffmpeg', '-i', audio_file, wav_file]
    subprocess.run(command, check=True)
    return wav_file

def video_to_wav(video_url):
    try:
        audio_file = download_video(video_url)
        wav_file = convert_to_wav(audio_file)
        os.remove(audio_file)  # Optional: remove the original audio file after conversion
        return wav_file
    except Exception as e:
        return str(e)

# Example usage
video_url = input("Enter the video URL: ")
wav_file = video_to_wav(video_url)
print(f"WAV file saved as: {wav_file}")


Enter the video URL: https://www.youtube.com/watch?v=SSrGOkw667k
Converting to WAV
WAV file saved as: /content/Bakra Eid Part 2.wav


In [None]:
import nemo.collections.asr as nemo_asr
import numpy as np
from IPython.display import Audio, display
import librosa
import os
import wget
import matplotlib.pyplot as plt
from omegaconf import OmegaConf
import shutil
import nemo
import glob

import pprint
pp = pprint.PrettyPrinter(indent=4)
import os
import glob
import librosa
from IPython.display import Audio

# Define the root directory
ROOT = os.getcwd()
data_dir = os.path.join(ROOT, 'data')
os.makedirs(data_dir, exist_ok=True)
# print(data_dir)
# Prompt user to provide the path to the audio file
audio_input_path = "/content/PTT-20240611-WA0002.mp3"

# Ensure the audio file exists
if os.path.exists(audio_input_path):
    AUDIO_FILENAME = audio_input_path
    audio_file_list = glob.glob(f"{data_dir}/*")
    print("Input audio file list: \n", audio_file_list)
else:
    raise FileNotFoundError(f"Audio file not found at {audio_input_path}")

# Load and display the audio file
signal, sample_rate = librosa.load(AUDIO_FILENAME, sr=None)
display(Audio(signal, rate=sample_rate))

DOMAIN_TYPE = "meeting" # Can be meeting or telephonic based on domain type of the audio file
CONFIG_FILE_NAME = f"diar_infer_{DOMAIN_TYPE}.yaml"

CONFIG_URL = f"https://raw.githubusercontent.com/NVIDIA/NeMo/main/examples/speaker_tasks/diarization/conf/inference/{CONFIG_FILE_NAME}"

if not os.path.exists(os.path.join(data_dir,CONFIG_FILE_NAME)):
    CONFIG = wget.download(CONFIG_URL, data_dir)
else:
    CONFIG = os.path.join(data_dir,CONFIG_FILE_NAME)

cfg = OmegaConf.load(CONFIG)
# print(OmegaConf.to_yaml(cfg))


In [None]:
# Create a manifest file for input with below format.
# {"audio_filepath": "/path/to/audio_file", "offset": 0, "duration": null, "label": "infer", "text": "-",
# "num_speakers": null, "rttm_filepath": "/path/to/rttm/file", "uem_filepath"="/path/to/uem/filepath"}
import json
meta = {
    'audio_filepath': AUDIO_FILENAME,
    'offset': 0,
    'duration':None,
    'label': 'infer',

    'text': '-',
    'num_speakers': None,
    'rttm_filepath': None,
    'uem_filepath' : None
}
with open(os.path.join(data_dir,'input_manifest.json'),'w') as fp:
    json.dump(meta,fp)
    fp.write('\n')

cfg.diarizer.manifest_filepath = os.path.join(data_dir,'input_manifest.json')
!cat {cfg.diarizer.manifest_filepath}

In [None]:
pretrained_speaker_model='titanet_large'
cfg.diarizer.manifest_filepath = cfg.diarizer.manifest_filepath
cfg.diarizer.out_dir = data_dir #Directory to store intermediate files and prediction outputs
cfg.diarizer.speaker_embeddings.model_path = pretrained_speaker_model
cfg.diarizer.clustering.parameters.oracle_num_speakers=False
from nemo.collections.asr.parts.utils.decoder_timestamps_utils import ASRDecoderTimeStamps


# Using Neural VAD and Conformer ASR
cfg.diarizer.vad.model_path = 'vad_multilingual_marblenet'
cfg.diarizer.asr.model_path = 'stt_en_conformer_ctc_large'
cfg.diarizer.oracle_vad = False # ----> Not using oracle VAD
cfg.diarizer.asr.parameters.asr_based_vad = False
asr_decoder_ts = ASRDecoderTimeStamps(cfg.diarizer)
asr_model = asr_decoder_ts.set_asr_model()
word_hyp, word_ts_hyp = asr_decoder_ts.run_ASR(asr_model)

In [None]:
from nemo.collections.asr.parts.utils.diarization_utils import OfflineDiarWithASR
asr_diar_offline = OfflineDiarWithASR(cfg.diarizer)
asr_diar_offline.word_ts_anchor_offset = asr_decoder_ts.word_ts_anchor_offset
diar_hyp, diar_score = asr_diar_offline.run_diarization(cfg, word_ts_hyp)


audio_name = os.path.splitext(os.path.basename(audio_input_path))[0]

# Ensure the audio file exists
if os.path.exists(audio_input_path):
    AUDIO_FILENAME = audio_input_path
    audio_file_list = glob.glob(f"{data_dir}/*")
    # print("Input audio file list: \n", audio_file_list)
else:
    raise FileNotFoundError(f"Audio file not found at {audio_input_path}")

# Print the extracted audio name
print("Extracted audio name:", audio_name)

# Search for the corresponding RTTM file
def find_rttm_file(audio_name, rttm_folder):
    for filename in os.listdir(rttm_folder):
        if filename.startswith(audio_name) and filename.endswith(".rttm"):
            return os.path.join(rttm_folder, filename)
    return None

def read_file(path_to_file):
    if os.path.isfile(path_to_file):
        with open(path_to_file) as f:
            contents = f.read().splitlines()
        return contents
    else:
        print(f"File '{path_to_file}' not found.")
        return []

rttm_folder = os.path.join(data_dir, "pred_rttms")
predicted_speaker_label_rttm_path = find_rttm_file(audio_name, rttm_folder)

if predicted_speaker_label_rttm_path:
    pred_rttm = read_file(predicted_speaker_label_rttm_path)
    # pp.pprint(pred_rttm)

    if pred_rttm:
        from nemo.collections.asr.parts.utils.speaker_utils import rttm_to_labels

        pred_labels = rttm_to_labels(predicted_speaker_label_rttm_path)

    #     color = get_color(signal, pred_labels)
    #     display_waveform(signal, 'Audio with Speaker Labels', color)
    #     # display(Audio(signal,rate=16000)) of the audio name and then select
else:
    print(f"No RTTM file found for audio '{audio_name}'.")

In [None]:
trans_info_dict = asr_diar_offline.get_transcript_with_speaker_labels(diar_hyp, word_hyp, word_ts_hyp)

In [None]:
# Transcription path
transcription_path_to_file = f"{data_dir}/pred_rttms/{audio_name}.txt"
transcript = read_file(transcription_path_to_file)
pp.pprint(transcript)

[   '[00:00.67 - 00:13.16] speaker_0: caffeine addiction starting as morning '
    'ritual with coffee or tea and gradually becomes a necessity start today '
    'over time rece on caffeine caar fading to increased consumption just to '
    'achieve the same wayful effect',
    '[00:15.08 - 00:26.92] speaker_2: ying fors abilit to enhance focus and '
    'energy produces it already can lead to a host of unwanted side effect '
    'symptoms like inzoa andess and increase heart trate',
    '[00:29.72 - 00:42.44] speaker_1: the cycle of caffeine addiction often '
    'involves in a fro known slump followed by a booz from a caffine to drink '
    'only to pressure again this would going affect mental clarity and mod '
    'make it making it difficult to maintain a studyerg level throug that y',
    '[00:44.04 - 01:00.36] speaker_0: breaking free from caffeine addiction '
    'requires a deliberate and often challenging reduction and take a strategy '
    'such as substituting with less caf

In [None]:
import numpy as np
from IPython.display import Audio, display

def extract_speaker_chunks(signal, sample_rate, pred_labels):
    speakers = {}
    for time_stamp in pred_labels:
        start, end, label = time_stamp.split()
        start, end = int(float(start) * sample_rate), int(float(end) * sample_rate)
        if label not in speakers:
            speakers[label] = []
        speakers[label].append(signal[start:end])
    return speakers

# Example signal, sample_rate, and pred_labels (these should be provided)


speakers = extract_speaker_chunks(signal, sample_rate, pred_labels)

# Play the first chunk of each speaker
for speaker, chunks in speakers.items():
    print(f"First chunk of {speaker}")
    display(Audio(chunks[0], rate=sample_rate))


First chunk of speaker_0


First chunk of speaker_2


First chunk of speaker_1


In [None]:
import os
import soundfile as sf

# Replace with your actual audio name

# Display the available speakers
print("Available speakers:")
for speaker, chunks in speakers.items():
    print(f"{speaker}: {len(chunks)} chunks")

# User selects a speaker by name
selected_speaker = input("Select a speaker by name: ")

# Create a folder named after the selected speaker and audio name
output_folder = os.path.join(data_dir, f"{audio_name}_{selected_speaker}")
os.makedirs(output_folder, exist_ok=True)

# Save the chunks of the selected speaker
for idx, chunk in enumerate(speakers[selected_speaker]):
    output_file = os.path.join(output_folder, f"{selected_speaker}_chunk_{idx}.wav")
    sf.write(output_file, chunk, sample_rate)
    print(f"Chunk saved as {output_file}")

print(f"All chunks of {selected_speaker} saved in {output_folder}")


Available speakers:
speaker_0: 2 chunks
speaker_2: 1 chunks
speaker_1: 1 chunks
Select a speaker by name: speaker_1
Chunk saved as /content/data/PTT-20240611-WA0002_speaker_1/speaker_1_chunk_0.wav
All chunks of speaker_1 saved in /content/data/PTT-20240611-WA0002_speaker_1
