Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSDD inference is too slow #7101

Closed
SagyHarpazGong opened this issue Jul 24, 2023 · 51 comments
Closed

MSDD inference is too slow #7101

SagyHarpazGong opened this issue Jul 24, 2023 · 51 comments
Assignees
Labels

Comments

@SagyHarpazGong
Copy link

SagyHarpazGong commented Jul 24, 2023

I run the MSDD model on Nvidia A10 (24GB), but the inference is too slow, I looked on the code and there is a lot of traffic between the CPU and GPU and vice versa.

most of the time GPU utilization is on 0%

First the data is split into short segments according to the number of scales (I have 5 scales).
After each scale splitting the embedding extraction is applied and save the embedding to pkl file.
Then the the clustering is applied and finally the MSDD is applied.

Is there something that can be done in order to speed up the inference?
Is there any flag for parallelism the embedding extraction stage?

please help.

@nithinraok
Copy link
Collaborator

What do you mean by parallelism of embedding extraction when you are inferencing on single GPU?

@nithinraok nithinraok assigned tango4j and nithinraok and unassigned okuchaiev Jul 25, 2023
@tango4j
Copy link
Collaborator

tango4j commented Jul 25, 2023

This is very recent issue we also discovered. It's not the MSDD but TitaNet embedding extractor is taking a lot of time. I will look into it and get back soon.

@SagyHarpazGong
Copy link
Author

Hi @nithinraok , thanks for your response.
What ai meant is that there is a for loop that run the segmentation and the embedding extraction.
But as I saw the bottleneck of the execution time is the traffic between the GPU and the CPU, and of course the writing and reading files (e.g. pkl files)

@SagyHarpazGong
Copy link
Author

Thanks @tango4j yes I also think so

@nithinraok
Copy link
Collaborator

You can skip writing pkl files as well, have you tried disabling saving pickle files through config?

@SagyHarpazGong
Copy link
Author

Of course but the msdd use them so if I disable the saving of the pkl files I get FileNotFoundError

@tango4j
Copy link
Collaborator

tango4j commented Jul 25, 2023

This issue is happening only for MSDD diarizer, not for clustering diarizer. I suppose something related to yaml setting is causing this. Let me get back to this soon.

@SagyHarpazGong
Copy link
Author

I want to use the msdd diarization

@tango4j
Copy link
Collaborator

tango4j commented Jul 25, 2023

@SagyHarpazGong Sure, let us work on this. Thanks...!

@tango4j
Copy link
Collaborator

tango4j commented Jul 26, 2023

@SagyHarpazGong
I have found that it is getting slow if we use TitaNet ckpt in MSDD nemo file.
A quick fix is adding the following:

 diarizer.speaker_embeddings.model_path="titanet_large" \
 diarizer.msdd_model.parameters.use_speaker_model_from_ckpt=False \

in the yaml config.
This could lead to a small change in terms of performance. (could be better or worse).
If you want quick fix, try this. Otherwise, it will take some more time to fix the speed with the speaker model from ckpt.

@SagyHarpazGong
Copy link
Author

@tango4j I checked as well and still slow, I'm really suspect that the reason for the slow inference is the utilization of the CPUs, most of the inference time the utilization of GPU is on 0%, and all I/O of file system is another reason for slow inference.

@tango4j
Copy link
Collaborator

tango4j commented Jul 27, 2023

@SagyHarpazGong
If you do not see the difference between before and after you apply the configs that I suggested, then your code is likely to not reflecting the change on loading TitaNet parameters.
Check your CUDA settings, and batch size for diarization inference. Make sure you are maximizing the GPU memory.

If it changes and speeds up, but the improvement is less then 30%, then please let us know.

@SagyHarpazGong
Copy link
Author

What CUDA settings should I need to check?
By increasing the batch_size the memory used in the GPU is almost on the maximum and still it's slow

@tango4j
Copy link
Collaborator

tango4j commented Jul 27, 2023

@SagyHarpazGong
Did you experience any relative improvement of speed after you apply use_speaker_model_from_ckpt=False ?

@SagyHarpazGong
Copy link
Author

@tango4j not at all

@tango4j
Copy link
Collaborator

tango4j commented Jul 27, 2023

@SagyHarpazGong
I suspect that the changes in the configuration are not being reflected at all in your code base. Unfortunately, at this time, we don't have a solution for the slowdown issue on your machine with your setup.

Apart from this, I will update the NGC MSDD model checkpoint to resolve this slow down issue.

@SagyHarpazGong
Copy link
Author

@tango4j thanks, I'll try to share images of the nvidia-smi during the inference in order to show you that most of the time the utilization of the GPU is on 0%

@SagyHarpazGong
Copy link
Author

Hi all, I fixed the issue by inherit the classes: ClusteringDiarizer, ClusterEmbedding, NeuralDiarizer and modified them so instead of saving the embeddings in pkl files and load them for the MSDD inference , the embeddings are passing to the MSDD inference without using the file system they are in the GPU memory.

this is my implementation:

class ClustDiar(ClusteringDiarizer):
    def _extract_embeddings(self, manifest_file: str, scale_idx: int, num_scales: int):
        """
        This method extracts speaker embeddings from segments passed through manifest_file
        Optionally you may save the intermediate speaker embeddings for debugging or any use.
        """
        logging.info("Extracting embeddings for Diarization")
        self._setup_spkr_test_data(manifest_file)
        self.embeddings = {}
        self._speaker_model.eval()
        self.time_stamps = {}

        all_embs = torch.empty([0], device=self._speaker_model.device)
        for test_batch in tqdm(
            self._speaker_model.test_dataloader(),
            desc=f'[{scale_idx+1}/{num_scales}] extract embeddings',
            leave=True,
            disable=not self.verbose,
        ):
            test_batch = [x.to(self._speaker_model.device) for x in test_batch]
            audio_signal, audio_signal_len, labels, slices = test_batch
            with autocast():
                _, embs = self._speaker_model.forward(input_signal=audio_signal, input_signal_length=audio_signal_len)
                emb_shape = embs.shape[-1]
                embs = embs.view(-1, emb_shape)
                all_embs = torch.cat((all_embs, embs.detach()), dim=0)
            del test_batch

        with open(manifest_file, 'r', encoding='utf-8') as manifest:
            for i, line in enumerate(manifest.readlines()):
                line = line.strip()
                dic = json.loads(line)
                uniq_name = get_uniqname_from_filepath(dic['audio_filepath'])
                if uniq_name in self.embeddings:
                    self.embeddings[uniq_name] = torch.cat((self.embeddings[uniq_name], all_embs[i].view(1, -1)))
                else:
                    self.embeddings[uniq_name] = all_embs[i].view(1, -1)
                if uniq_name not in self.time_stamps:
                    self.time_stamps[uniq_name] = []
                start = dic['offset']
                end = start + dic['duration']
                self.time_stamps[uniq_name].append([start, end])

    def diarize(self, paths2audio_files: List[str] = None, batch_size: int = 0):
        """
        Diarize files provided thorugh paths2audio_files or manifest file
        input:
        paths2audio_files (List[str]): list of paths to file containing audio file
        batch_size (int): batch_size considered for extraction of speaker embeddings and VAD computation
        """

        self._out_dir = self._diarizer_params.out_dir

        self._speaker_dir = os.path.join(self._diarizer_params.out_dir, 'speaker_outputs')

        if os.path.exists(self._speaker_dir):
            logging.warning("Deleting previous clustering diarizer outputs.")
            shutil.rmtree(self._speaker_dir, ignore_errors=True)
        os.makedirs(self._speaker_dir)

        if not os.path.exists(self._out_dir):
            os.mkdir(self._out_dir)

        self._vad_dir = os.path.join(self._out_dir, 'vad_outputs')
        self._vad_out_file = os.path.join(self._vad_dir, "vad_out.json")

        if batch_size:
            self._cfg.batch_size = batch_size

        if paths2audio_files:
            if type(paths2audio_files) is list:
                self._diarizer_params.manifest_filepath = os.path.join(self._out_dir, 'paths2audio_filepath.json')
                self.path2audio_files_to_manifest(paths2audio_files, self._diarizer_params.manifest_filepath)
            else:
                raise ValueError("paths2audio_files must be of type list of paths to file containing audio file")

        self.AUDIO_RTTM_MAP = audio_rttm_map(self._diarizer_params.manifest_filepath)

        out_rttm_dir = os.path.join(self._out_dir, 'pred_rttms')
        os.makedirs(out_rttm_dir, exist_ok=True)

        # Speech Activity Detection
        self._perform_speech_activity_detection()

        # Segmentation
        scales = self.multiscale_args_dict['scale_dict'].items()
        self.emb_scale_seq_dict = {}
        for scale_idx, (window, shift) in scales:

            # Segmentation for the current scale (scale_idx)
            self._run_segmentation(window, shift, scale_tag=f'_scale{scale_idx}')

            # Embedding Extraction for the current scale (scale_idx)
            self._extract_embeddings(self.subsegments_manifest_path, scale_idx, len(scales))

            self.emb_scale_seq_dict[scale_idx] = self.embeddings

            self.multiscale_embeddings_and_timestamps[scale_idx] = [self.embeddings, self.time_stamps]

        embs_and_timestamps = get_embs_and_timestamps(
            self.multiscale_embeddings_and_timestamps, self.multiscale_args_dict
        )

        # Clustering
        all_reference, all_hypothesis = perform_clustering(
            embs_and_timestamps=embs_and_timestamps,
            AUDIO_RTTM_MAP=self.AUDIO_RTTM_MAP,
            out_rttm_dir=out_rttm_dir,
            clustering_params=self._cluster_params,
            device=self._speaker_model.device,
            verbose=self.verbose,
        )
        logging.info("Outputs are saved in {} directory".format(os.path.abspath(self._diarizer_params.out_dir)))

        # Scoring
        return score_labels(
            self.AUDIO_RTTM_MAP,
            all_reference,
            all_hypothesis,
            collar=self._diarizer_params.collar,
            ignore_overlap=self._diarizer_params.ignore_overlap,
            verbose=self.verbose,
        )


class ClusEmb(ClusterEmbedding):
    """
    This class is built for calculating cluster-average embeddings, segmentation and load/save of the estimated cluster labels.
    The methods in this class is used for the inference of MSDD models.

    Args:
        cfg_diar_infer (DictConfig):
            Config dictionary from diarization inference YAML file
        cfg_msdd_model (DictConfig):
            Config dictionary from MSDD model checkpoint file

    Class Variables:
        self.cfg_diar_infer (DictConfig):
            Config dictionary from diarization inference YAML file
        cfg_msdd_model (DictConfig):
            Config dictionary from MSDD model checkpoint file
        self._speaker_model (class `EncDecSpeakerLabelModel`):
            This is a placeholder for class instance of `EncDecSpeakerLabelModel`
        self.scale_window_length_list (list):
            List containing the window lengths (i.e., scale length) of each scale.
        self.scale_n (int):
            Number of scales for multi-scale clustering diarizer
        self.base_scale_index (int):
            The index of the base-scale which is the shortest scale among the given multiple scales
    """
    def __init__(
        self, cfg_diar_infer: DictConfig, cfg_msdd_model: DictConfig, speaker_model: Optional[EncDecSpeakerLabelModel]
    ):
        super().__init__(cfg_diar_infer, cfg_msdd_model, speaker_model)
        self.cfg_diar_infer = cfg_diar_infer
        self._cfg_msdd = cfg_msdd_model
        self._speaker_model = speaker_model
        self.scale_window_length_list = list(
            self.cfg_diar_infer.diarizer.speaker_embeddings.parameters.window_length_in_sec
        )
        self.scale_n = len(self.scale_window_length_list)
        self.base_scale_index = len(self.scale_window_length_list) - 1
        self.clus_diar_model = ClustDiar(cfg=self.cfg_diar_infer, speaker_model=self._speaker_model)

    def run_clustering_diarizer(self, manifest_filepath: str, emb_dir: str):
        """
        If no pre-existing data is provided, run clustering diarizer from scratch. This will create scale-wise speaker embedding
        sequence, cluster-average embeddings, scale mapping and base scale clustering labels. Note that speaker embedding `state_dict`
        is loaded from the `state_dict` in the provided MSDD checkpoint.

        Args:
            manifest_filepath (str):
                Input manifest file for creating audio-to-RTTM mapping.
            emb_dir (str):
                Output directory where embedding files and timestamp files are saved.

        Returns:
            emb_sess_avg_dict (dict):
                Dictionary containing cluster-average embeddings for each session.
            emb_scale_seq_dict (dict):
                Dictionary containing embedding tensors which are indexed by scale numbers.
            base_clus_label_dict (dict):
                Dictionary containing clustering results. Clustering results are cluster labels for the base scale segments.
        """
        self.cfg_diar_infer.diarizer.manifest_filepath = manifest_filepath
        self.cfg_diar_infer.diarizer.out_dir = emb_dir

        # Run ClusteringDiarizer which includes system VAD or oracle VAD.
        self._out_dir = self.clus_diar_model._diarizer_params.out_dir
        self.out_rttm_dir = os.path.join(self._out_dir, 'pred_rttms')
        os.makedirs(self.out_rttm_dir, exist_ok=True)

        self.clus_diar_model._cluster_params = self.cfg_diar_infer.diarizer.clustering.parameters
        self.clus_diar_model.multiscale_args_dict[
            "multiscale_weights"
        ] = self.cfg_diar_infer.diarizer.speaker_embeddings.parameters.multiscale_weights
        self.clus_diar_model._diarizer_params.speaker_embeddings.parameters = (
            self.cfg_diar_infer.diarizer.speaker_embeddings.parameters
        )
        cluster_params = self.clus_diar_model._cluster_params
        cluster_params = dict(cluster_params) if isinstance(cluster_params, DictConfig) else cluster_params.dict()
        clustering_params_str = json.dumps(cluster_params, indent=4)

        logging.info(f"Multiscale Weights: {self.clus_diar_model.multiscale_args_dict['multiscale_weights']}")
        logging.info(f"Clustering Parameters: {clustering_params_str}")
        scores = self.clus_diar_model.diarize(batch_size=self.cfg_diar_infer.batch_size)

        # If RTTM (ground-truth diarization annotation) files do not exist, scores is None.
        if scores is not None:
            metric, speaker_mapping_dict, _ = scores
        else:
            metric, speaker_mapping_dict = None, None

        # Get the mapping between segments in different scales.
        self._embs_and_timestamps = get_embs_and_timestamps(
            self.clus_diar_model.multiscale_embeddings_and_timestamps, self.clus_diar_model.multiscale_args_dict
        )
        session_scale_mapping_dict = self.get_scale_map(self._embs_and_timestamps)
        clus_labels = self.load_clustering_labels(emb_dir)
        emb_sess_avg_dict, base_clus_label_dict = self.get_cluster_avg_embs(
            self.clus_diar_model.emb_scale_seq_dict, clus_labels, speaker_mapping_dict, session_scale_mapping_dict
        )
        self.clus_diar_model.emb_scale_seq_dict['session_scale_mapping'] = session_scale_mapping_dict
        return emb_sess_avg_dict, self.clus_diar_model.emb_scale_seq_dict, base_clus_label_dict, metric


class NeuralDiar(NeuralDiarizer):
    def __init__(self, cfg: Union[DictConfig, NeuralDiarizerInferenceConfig]):
        super().__init__(cfg)
        self._cfg = cfg
        self._speaker_model = None
        self.msdd_model = None

        # Parameter settings for MSDD model
        self.use_speaker_model_from_ckpt = cfg.diarizer.msdd_model.parameters.get('use_speaker_model_from_ckpt', True)
        self.use_clus_as_main = cfg.diarizer.msdd_model.parameters.get('use_clus_as_main', False)
        self.max_overlap_spks = cfg.diarizer.msdd_model.parameters.get('max_overlap_spks', 2)
        self.num_spks_per_model = cfg.diarizer.msdd_model.parameters.get('num_spks_per_model', 2)
        self.use_adaptive_thres = cfg.diarizer.msdd_model.parameters.get('use_adaptive_thres', True)
        self.max_pred_length = cfg.diarizer.msdd_model.parameters.get('max_pred_length', 0)
        self.diar_eval_settings = cfg.diarizer.msdd_model.parameters.get(
            'diar_eval_settings', [(0.25, True), (0.25, False), (0.0, False)]
        )

        self._init_msdd_model(cfg)
        self.diar_window_length = cfg.diarizer.msdd_model.parameters.diar_window_length
        self.transfer_diar_params_to_model_params(self.msdd_model, cfg)

        if self.msdd_model is None:
            raise TypeError(f'The MSDD model is None')
        # Initialize clustering and embedding preparation instance (as a diarization encoder).
        self.clustering_embedding = ClusEmb(
            cfg_diar_infer=cfg, cfg_msdd_model=self.msdd_model.cfg, speaker_model=self._speaker_model
        )

        # Parameters for creating diarization results from MSDD outputs.
        self.clustering_max_spks = self.msdd_model.cfg.max_num_of_spks
        self.overlap_infer_spk_limit = cfg.diarizer.msdd_model.parameters.get(
            'overlap_infer_spk_limit', self.clustering_max_spks
        )

@github-actions
Copy link
Contributor

This issue is stale because it has been open for 30 days with no activity. Remove stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the stale label Aug 31, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Sep 7, 2023

This issue was closed because it has been inactive for 7 days since being marked as stale.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Sep 7, 2023
@prkumar112451
Copy link

Looks like not many people use MSDD. It is 2024 mid and Nemo inference is super slow for MSDD and still no action is taken on this

@nithinraok
Copy link
Collaborator

@SagyHarpazGong did your implementation help reduce the CPU-GPU bottleneck and improve performance speed?

@nithinraok nithinraok reopened this May 13, 2024
@nithinraok
Copy link
Collaborator

@prkumar112451 thanks for your comments, unfortunately we might have missed this or busy with other works, thank you for bringing this issue again.

@nithinraok
Copy link
Collaborator

nithinraok commented May 13, 2024

@prkumar112451 Thanks for detailed comments. Currently the way we improved accuracy of NeMo diarization is by using embeddings at multi-scales which I believe would be the issue for your 20min audio. There are ways to improve this.

First to answer them I would need some clarifications from your end,

  • Would be sticking to "clustering only" a viable option for you?
  • We can drastically improve speed by reducing the number of scales we use for getting embeddings at various resolutions but this comes at a cost of bit reduced accuracy, how are your accuracy requirements looking it?
  • Would it be possible to share a sample 20min audio you are using (if not that is totally fine) and exact settings you were using.

@prkumar112451
Copy link

@nithinraok Thanks for quick response.

the 20 min audio is a call center telephony conversation between a customer and an agent.

I am using a combination of Whisper for transcription and then Nemo for diarization. Taking this repo as reference -

https://github.com/piegu/language-models/blob/master/speech_to_text_transcription_with_speakers_Whisper_Transcription_%2B_NeMo_Diarization.ipynb?source=post_page-----8da2312f1617--------------------------------

we can see that lots of whisper optimization techniques are there like flash-attention, batching etc. And have been able to speed up whisper alot.

But the diarization part is acting as bottleneck. Just to be very sure, I completely removed the whisper part and ran a plain nemo's telephony based ai-model iar_msdd_telephonic but it's speed is 1 minute diarization time for 20 min call recording.

To answer your questions :

  1. any architecture that solves the use-case and reduce delay with okayish accuracy so that we can scale is good enough
  2. could you share the configuration name which we need to update to reduce the number of scales
  3. This is the configuration setting which we are using :
    https://raw.githubusercontent.com/NVIDIA/NeMo/main/examples/speaker_tasks/diarization/conf/inference/diar_infer_telephonic.yaml

@nithinraok
Copy link
Collaborator

nithinraok commented May 13, 2024

Regarding the performace bottleneck of diarization, if you could tolerate some performance in accuracy, I would suggest you to try the clustering diarizer with single scale without msdd model, as shown in below config here:

MANIFEST_FILE='callhome_109.json'
python examples/speaker_tasks/diarization/clustering_diarizer/offline_diar_infer.py \
        --config-path='examples/speaker_tasks/diarization/conf/inference' --config-name='diar_infer_telephonic.yaml' \
    diarizer.manifest_filepath=$MANIFEST_FILE \
    diarizer.out_dir='/data/sample/' \
    diarizer.speaker_embeddings.model_path=${MODEL_PATH} \
    diarizer.speaker_embeddings.parameters.window_length_in_sec=1.5 \
    diarizer.speaker_embeddings.parameters.shift_length_in_sec=0.75 \
    diarizer.vad.model_path='vad_multilingual_marblenet' \
    diarizer.asr.model_path=null \
    diarizer.msdd_model.model_path=null \
    diarizer.oracle_vad=False \
    diarizer.clustering.parameters.oracle_num_speakers=False \
    batch_size=256 \
    num_workers=1

This setting would be fast, you may note that we could switch from external VAD to ASR VAD as well, so you could do ASR+SD in one go. We explained some of these settings here, pls feel free to explore: https://github.com/NVIDIA/NeMo/tree/main/examples/speaker_tasks/diarization#run-speech-recognition-with-speaker-diarization. Very important to note that common setting might not be best for all kind of audio samples due to various backgrounds and noise level so use it accordingly. Above configuration does only clustering based diarization with single scale embeddings using VAD output from marblenet vad.

Also,
It is very exciting to see your use case and ofcourse we have blazing light ASR models that can do inference with punctuations and capitalizations, could you give this model a try: https://huggingface.co/nvidia/parakeet-tdt_ctc-1.1b ?

I am looking to put together a space with above model and speaker diarization soon will keep it posted here.

@nithinraok
Copy link
Collaborator

We are working on improving RTF for ASR models even more, you can only expect models to get better in terms of both speed and accuracy.

@github-actions github-actions bot removed the stale label May 14, 2024
@prkumar112451
Copy link

@nithinraok

To run python examples/speaker_tasks/diarization/clustering_diarizer/offline_diar_infer.py on Kaggle notebook,

installed these libraries (mentioned in PIP installation section of NEMO Github https://github.com/NVIDIA/NeMo/ ) :
!apt-get update && apt-get install -y libsndfile1 ffmpeg
!pip install Cython
!pip install nemo_toolkit['all']

and then did the import that is on the top of offline_diar_infer.py file (https://github.com/NVIDIA/NeMo/blob/main/nemo/collections/asr/models/clustering_diarizer.py ):

from omegaconf import OmegaConf
from pytorch_lightning import seed_everything

from nemo.collections.asr.models import ClusteringDiarizer
from nemo.core.config import hydra_runner
from nemo.utils import logging

Getting this error :

File /opt/conda/lib/python3.10/site-packages/datasets/filesystems/s3filesystem.py:1
----> 1 import s3fs
3 from ..utils.deprecation_utils import deprecated
6 @deprecated("Use s3fs.S3FileSystem instead.")
7 class S3FileSystem(s3fs.S3FileSystem):

File /opt/conda/lib/python3.10/site-packages/s3fs/init.py:1
----> 1 from .core import S3FileSystem, S3File
2 from .mapping import S3Map
4 from ._version import get_versions

File /opt/conda/lib/python3.10/site-packages/s3fs/core.py:29
27 import aiobotocore
28 import botocore
---> 29 import aiobotocore.session
30 from aiobotocore.config import AioConfig
31 from botocore.exceptions import ClientError, HTTPClientError, ParamValidationError

File /opt/conda/lib/python3.10/site-packages/aiobotocore/session.py:10
7 from botocore.session import UnknownServiceError, copy
9 from . import version, retryhandler
---> 10 from .client import AioBaseClient, AioClientCreator
11 from .configprovider import AioSmartDefaultsConfigStoreFactory
12 from .credentials import AioCredentials, create_credential_resolver

File /opt/conda/lib/python3.10/site-packages/aiobotocore/client.py:10
1 from botocore.awsrequest import prepare_request_dict
2 from botocore.client import (
3 BaseClient,
4 ClientCreator,
(...)
8 resolve_checksum_context,
9 )
---> 10 from botocore.compress import maybe_compress_request
11 from botocore.discovery import block_endpoint_discovery_required_operations
12 from botocore.exceptions import OperationNotPageableError, UnknownServiceError

ModuleNotFoundError: No module named 'botocore.compress'

image

Is there any restriction on which python version we need to use, I am using 3.10.13

@nithinraok
Copy link
Collaborator

It looks like s3filesystem is local to the notebook you are running, NeMo doesn;t depend on this package or its derivates. Would it be possible to share the notebook? We have similar notebook for diarization at https://github.com/NVIDIA/NeMo/blob/main/tutorials/speaker_tasks/Speaker_Diarization_Inference.ipynb for you to get started.

@SagyHarpazGong
Copy link
Author

@nithinraok
Hi.

My implementation speed up the embeddings extraction phase but still the clustering phase and the final phase (msdd) are extremely slow because there are a lot of traffic between the CPU and the GPU, also there are heavy CPU computations like "get_argmin_mat" function in offline_clustering.py

@nithinraok
Copy link
Collaborator

Your comments are highly appreciated, we will soon take these in to consideration and update our codebase.

@SagyHarpazGong
Copy link
Author

@nithinraok
About the function "get_argmin_mat" function in offline_clustering.py, when there are a lot of embedding vectors the CPU RAM usage is growing until "Killed" message, for example if we use machine with only 4 CPU cores (total RAM = 16GB) that function could cause Killed error because it tries to allocate more memory than available.
Because the tile functions that basically duplicating Tensors.
My temporal solution is batching the tiling.

This is the original function:

def get_argmin_mat(timestamps_in_scales: List[torch.Tensor]) -> List[torch.Tensor]:
    """
    Calculate the mapping between the base scale and other scales. A segment from a longer scale is
    repeatedly mapped to a segment from a shorter scale or the base scale.

    Args:
        timestamps_in_scales (list):
            List containing timestamp tensors for each scale.
            Each tensor has dimensions of (Number of base segments) x 2.

    Returns:
        session_scale_mapping_list (list):
            List containing argmin arrays indexed by scale index.
    """
    scale_list = list(range(len(timestamps_in_scales)))
    segment_anchor_list = []
    for scale_idx in scale_list:
        time_stamps_float = timestamps_in_scales[scale_idx]
        segment_anchor_list.append(torch.mean(time_stamps_float, dim=1))

    base_scale_idx = max(scale_list)
    base_scale_anchor = segment_anchor_list[base_scale_idx]
    session_scale_mapping_list = []
    for scale_idx in scale_list:
        curr_scale_anchor = segment_anchor_list[scale_idx]
        curr_mat = torch.tile(curr_scale_anchor, (base_scale_anchor.shape[0], 1))
        base_mat = torch.tile(base_scale_anchor, (curr_scale_anchor.shape[0], 1)).t()
        argmin_mat = torch.argmin(torch.abs(curr_mat - base_mat), dim=1)
        session_scale_mapping_list.append(argmin_mat)
    return session_scale_mapping_list

and this is my re-implementation:

def batching_argmin(base_scale_anchor: torch.Tensor, curr_scale_anchor: torch.Tensor, batch_size: int):
    """
    Calculate the element-wise argmin between two sets of anchor points, processing data in batches
    to reduce memory usage.

    Args:
        base_scale_anchor (torch.Tensor):
            The anchor points of the base scale, represented as a tensor.

        curr_scale_anchor (torch.Tensor):
            The anchor points of the current scale, represented as a tensor.

        batch_size (int):
            The batch size for processing the data.

    Returns:
        torch.Tensor:
            A tensor containing the element-wise argmin results between the base and current scale anchor points.
    """

    num_base_segments = base_scale_anchor.shape[0]
    temp_scale_mapping = []
    # Process data in batches to reduce memory usage
    for i in range(0, num_base_segments, batch_size):
        base_batch = base_scale_anchor[i:i + batch_size]
        argmin_mat = torch.argmin(torch.abs(curr_scale_anchor - base_batch.unsqueeze(1)), dim=1)
        temp_scale_mapping.append(argmin_mat)

    # Concatenate the results for all batches
    return torch.cat(temp_scale_mapping)

def get_argmin_mat(timestamps_in_scales: List[torch.Tensor], batch_size: int = 2048) -> List[torch.Tensor]:
    """
    Calculate the mapping between the base scale and other scales. This function computes
    the mapping of segments from a longer scale to segments from a shorter scale or the base scale.

    Args:
        timestamps_in_scales (list):
            A list of timestamp tensors for each scale. Each tensor has dimensions
            (Number of base segments) x 2.

        batch_size (int, optional):
            The batch size for computing the mapping. Defaults to 2048.

    Returns:
        session_scale_mapping_list (list):
            A list containing argmin arrays indexed by scale index.
    """
    scale_list = list(range(len(timestamps_in_scales)))
    segment_anchor_list = []

    for scale_idx in scale_list:
        time_stamps_float = timestamps_in_scales[scale_idx]
        segment_anchor_list.append(torch.mean(time_stamps_float, dim=1))

    base_scale_idx = max(scale_list)
    base_scale_anchor = segment_anchor_list[base_scale_idx]
    num_base_segments = base_scale_anchor.shape[0]

    session_scale_mapping_list = []

    for scale_idx in scale_list:
        curr_scale_anchor = segment_anchor_list[scale_idx]
        num_curr_segments = curr_scale_anchor.shape[0]

        session_scale_mapping_list.append(batching_argmin(base_scale_anchor, curr_scale_anchor, batch_size))
  
    return session_scale_mapping_list

@prkumar112451
Copy link

@nithinraok - tried the clustering diarization but the accuracy is too poor compared to neural diarization. Also the speed of clustering diarization of Nemo is roughly similar to the neural diarization of pyannote with better accuracy. It took a 6 min audio file 16 seconds to diarize.

I was planning to use Nemo for production instance but looking at its speed, finding it not very reliable to be used for production. Especially the factor that it is not able to use the GPUs really well and depends alot more on CPUs.

Could you share if there is any reason for Nemo team not taking feedback from @SagyHarpazGong and implementing the fixes he gave almost one year back in this comment
#7101 (comment)

I really liked Nemo, its Neural diarization accuracy was found to be better than pyaanote. But its speed is too low for it to be scalable enough

@SagyHarpazGong - Could you share if the fixes you have added, are you using Nemo on production instance? And will it be possible for you to check-in these changes in the clone of Nemo that you have in your repository. I tried cloning your repo but it didn't have the changes you shared in this thread to speedup Nemo

@SagyHarpazGong
Copy link
Author

@prkumar112451
My fixes are implemented only in my work repo and not in NeMo repo, but I shared exactly what I changed/added.
I really really don't have the time to work on PR for NeMo team.

Regarding to your question about using NeMo on production instance, the answer is yes NeMo (with my fixes) is running in production instance for at least half a year.
The improvement in the total diarization process is depend on the number of embedding vectors (the number of speech segments) most of our recordings are between 40-60 minutes and the improvement is approximately x2.5-x3 faster.
But in recordings that are longer than 60 minutes the improvement is approximately x2.

So as you already mentioned the bottleneck is the traffic between CPU and GPU and vice versa, but also the memory usage both in GPU and CPU.

BTW, I found out that a lot of the inference functions are under torch.no_grad() method, meaning each tensor in the process is x2 larger in size (tensor data and tensor gradients), so in my repo I just add decorator @torch.no_grad() at the top of the inference functions.

@nithinraok
Copy link
Collaborator

We are working on next version of Speaker Diarization, which doesn't depend on current clustering or MSDD, hence probably developers who worked on MSDD hasn;t given much attention however these are very valid points to add to the code base. We love these suggestions and will apply to improve.

@nithinraok
Copy link
Collaborator

Note: We worked on improving the speed of the current clustering diarizer for RIVA with support for TensorRT, which are not part of NeMo, however those improvements can only be used when using RIVA.

@nithinraok
Copy link
Collaborator

@maxpain
Copy link

maxpain commented May 24, 2024

We are working on next version of Speaker Diarization, which doesn't depend on current clustering or MSDD, hence probably developers who worked on MSDD hasn;t given much attention however these are very valid points to add to the code base. We love these suggestions and will apply to improve.

I'm also trying to add a NeMo diarizer to my pipeline and can't wait for the update.
In my case, I extract embeddings using titanet for each speaker in the audio file (about 1-5 minutes each).

@maxpain
Copy link

maxpain commented May 24, 2024

The main bottleneck is the CPU.
GPUs are barely used during the diarization process.
Screenshot 2024-05-24 at 20 09 25
Screenshot 2024-05-24 at 20 09 32

@prkumar112451
Copy link

@nithinraok - RIVA is too expensive, can't go with that. You shared about TensorRT. Are you suggesting that Nemo will be faster with TensorRT?

https://github.com/NVIDIA/TensorRT

If we run Nemo within TensorRT container in a T4 GPU, will it speed up nemo to at least 2x to 3x times?

@prkumar112451
Copy link

@nithinraok - Also, you shared about working on new version of speaker diarization which is not related to clustering or MSDD. Could you share if there is any rough date on which we can expect that.

After looking at all the options, found that pyannote uses GPUs in a much better way. That would mean, GPUs with better cuda cores would means quicker response. Had to go with pyannote and currently working on finetuning of pyannote to improve the quality of the output.

But must say, it was disappointing to find that Nemo, which felt really good at first, had so much CPU dependency and such a poor usage of GPU. Looks like for a production environment with good number of recordings to transcribe, this just wouldn't work and currently pyannote is only good enough option..

@prkumar112451
Copy link

Any updates on this please?

@tango4j
Copy link
Collaborator

tango4j commented Jun 24, 2024

@prkumar112451
I am sorry that you are feeling that way. Unfortunately, we do not have plans on improving the speed gain of the pipelined speaker diarization in NeMo. We will directly move on to the end-to-end version of speaker diarization.
In terms of the new end-to-end model release, I think it would be not until the end of September.

Copy link
Contributor

This issue is stale because it has been open for 30 days with no activity. Remove stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the stale label Jul 25, 2024
Copy link
Contributor

github-actions bot commented Aug 1, 2024

This issue was closed because it has been inactive for 7 days since being marked as stale.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Aug 1, 2024
@prkumar112451
Copy link

prkumar112451 commented Oct 4, 2024

@tango4j @nithinraok I am seeing this update on linkedin -

Image

is this true, have you guys made this 10x speed improvement in your code base?

@nithinraok
Copy link
Collaborator

Its for ASR models (especially parakeet), yes its part of NeMo main.

@tfriedel
Copy link

tfriedel commented Oct 5, 2024

but nothing for speaker diarization yet? Is a new end-to-end speaker diarization model still expected soon?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants