In [None]:
!pip install nemo_toolkit[all]
!pip install onnxruntime onnxruntime-gpu # for gpu, use onnxruntime-gpu
!pip install boto3 botocore

**COVERT MODEL TO ONNX**

In [None]:
import nemo.collections.asr as nemo_asr

# quartznet = nemo_asr.models.EncDecCTCModel.restore_from('/kaggle/input/vivos-manifest/quartznet_vivos_100.nemo')
# quartznet.export('quartznet_vivos_100.onnx')

# citrinet = nemo_asr.models.ASRModel.restore_from('/kaggle/input/libritts-manifest/citrinet_libri_bpe_freeze.nemo')
# citrinet.export('citrinet_libri_bpe_freeze.onnx')

conformer = nemo_asr.models.EncDecCTCModelBPE.restore_from('/kaggle/input/libritts-manifest/conformer_libri_bpe_unfreeze.nemo')
conformer.export('conformer_libri_bpe_unfreeze.onnx')

In [None]:
import yaml
from omegaconf import DictConfig, OmegaConf

# model_Onnx = nemo_asr.models.ASRModel.restore_from(restore_path="/kaggle/input/vivos-manifest/quartznet_vivos_100.nemo", return_config = True)
model_Onnx = nemo_asr.models.EncDecCTCModelBPE.restore_from('/kaggle/input/libritts-manifest/conformer_libri_bpe_unfreeze.nemo', return_config = True)

textfile = open("conformer_libri_bpe_unfreeze.yaml", "w")
textfile.write(str(OmegaConf.to_yaml(model_Onnx)))
textfile.close()

**EXAMPLE OF USING .NEMO MODEL INSTEAD OF .ONNX**

In [None]:
from jiwer import wer
import json
import pandas as pd


def convert_manifest_to_df(manifest_path):
    audio_filepath = list()
    duration = list()
    text = list()
    with open(manifest_path, encoding="utf8") as f:
        for line in f:
            metadata = json.loads(line)
            audio_filepath.append(metadata['audio_filepath'])
            duration.append(metadata['duration'])
            text.append(metadata['text'])

    return pd.DataFrame({'audio_filepath': audio_filepath,
                          'duration': duration,
                          'text': text})

In [None]:
import torch
device = 'cuda' if torch.cuda.is_available else 'cpu'
conformer.to(device)

In [None]:
df = convert_manifest_to_df('/kaggle/input/libritts-manifest/dev-other-manifest-kaggle.json')
audio_path = df['audio_filepath'].tolist()
reference = df['text'].tolist()

hypothesis = conformer.transcribe(paths2audio_files=audio_path)
print(wer(reference, hypothesis))

In [None]:
df = convert_manifest_to_df('/kaggle/input/libritts-manifest/dev-clean-manifest-kaggle.json')
audio_path = df['audio_filepath'].tolist()
reference = df['text'].tolist()

hypothesis = conformer.transcribe(paths2audio_files=audio_path)
print(wer(reference, hypothesis))

In [None]:
hypothesis = conformer.transcribe(paths2audio_files=["/kaggle/input/libritts/test-clean/test-clean/1089/134686/1089-134686-0000.wav"])
hypothesis

In [None]:
from pydub import AudioSegment
import os
import wave


def concatenate_wav_files(input_folder, output_file):
    concatenated_audio = AudioSegment.silent(duration=0)

    for foldername, _, filenames in os.walk(input_folder):
        for filename in filenames:
            if filename.endswith(".wav"):
                current_audio = AudioSegment.from_wav(os.path.join(foldername, filename))
                concatenated_audio += current_audio
    concatenated_audio.export(output_file, format="wav")
    
def extract_first_minute(input_file, time):
    minute = time/60000
    output_file = input_file + f"_{minute}.wav"
    audio = AudioSegment.from_file(input_file, format="wav")

    first_minute = audio[:time]

    # Export the first minute to a new file
    first_minute.export(output_file, format="wav")

def get_wav_duration(file_path):
    with wave.open(file_path, 'rb') as wav_file:
        # Get the number of frames and the frame rate
        num_frames = wav_file.getnframes()
        frame_rate = wav_file.getframerate()

        # Calculate the duration in seconds
        duration = num_frames / float(frame_rate)
        
        return duration

In [None]:
concatenate_wav_files('/kaggle/input/vivos-vietnamese-speech-corpus-for-asr/vivos/test/waves', 'output.wav')

In [None]:
extract_first_minute('/kaggle/working/output.wav', 600000)
extract_first_minute('/kaggle/working/output.wav', 300000)
extract_first_minute('/kaggle/working/output.wav', 900000)
extract_first_minute('/kaggle/working/output.wav', 1200000)

In [None]:
file_path = '/kaggle/working/output.wav_20.0.wav'
duration = get_wav_duration(file_path)
print(f'The duration of the WAV file is {duration:.2f} seconds.')

**HELPERS**

In [None]:
config_path = "/kaggle/working/conformer_libri_bpe_unfreeze.yaml"
with open(config_path) as f:
    params_conformer = yaml.safe_load(f)

In [None]:
from nemo.collections.asr.models.ctc_models import EncDecCTCModel
from nemo.collections.asr.metrics.wer import CTCDecoding

import yaml
from omegaconf import DictConfig, OmegaConf


config_path = "/kaggle/working/stt_en_conformer_ctc_small.yaml"
with open(config_path) as f:
    params = yaml.safe_load(f)
    

preprocessor_cfg = DictConfig(params).preprocessor
preprocessor = EncDecCTCModel.from_config_dict(preprocessor_cfg)

labels = params['decoder']['vocabulary']

decoding_cfg = DictConfig(params_conformer).decoding
decoding = CTCDecoding(decoding_cfg, labels)

In [None]:
def to_numpy(tensor):
    return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()

def setup_transcribe_dataloader(cfg, vocabulary):
    config = {
        'manifest_filepath': os.path.join(cfg['temp_dir'], 'manifest.json'),
        'sample_rate': 16000,
        'labels': vocabulary,
        'batch_size': min(cfg['batch_size'], len(cfg['paths2audio_files'])),
        'trim_silence': True,
        'shuffle': False,
    }
    dataset = AudioToCharDataset(
        manifest_filepath=config['manifest_filepath'],
        labels=config['labels'],
        sample_rate=config['sample_rate'],
        int_values=config.get('int_values', False),
        augmentor=None,
        max_duration=config.get('max_duration', None),
        min_duration=config.get('min_duration', None),
        max_utts=config.get('max_utts', 0),
        blank_index=config.get('blank_index', -1),
        unk_index=config.get('unk_index', -1),
        normalize=config.get('normalize_transcripts', False),
        trim=config.get('trim_silence', True),
        parser=config.get('parser', 'en'),
    )
    return torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=config['batch_size'],
        collate_fn=dataset.collate_fn,
        drop_last=config.get('drop_last', False),
        shuffle=False,
        num_workers=config.get('num_workers', 0),
        pin_memory=config.get('pin_memory', False),
    )

**.WAV INFERENCE FILE**

In [None]:
audio_file = "/kaggle/input/libritts/test-clean/test-clean/1089/134686/1089-134686-0000.wav"

**ONNX INFERENCE**

In [None]:
import onnxruntime
import tempfile
import os
import json
import numpy as np
import torch
from nemo.collections.asr.metrics.wer import WER
from nemo.collections.asr.data.audio_to_text import AudioToCharDataset


# ort_session = onnxruntime.InferenceSession('/kaggle/working/quartznet_vivos_100.onnx', providers=['TensorrtExecutionProvider', 'CUDAExecutionProvider', 'CPUExecutionProvider'])
ort_session = onnxruntime.InferenceSession('/kaggle/working/stt_en_conformer_ctc_small.onnx')

with tempfile.TemporaryDirectory() as tmpdir:
    with open(os.path.join(tmpdir, 'manifest.json'), 'w') as fp:
        for audio_file in [audio_file]:
            entry = {'audio_filepath': audio_file, 'duration': 100000, 'text': 'nothing'}
            fp.write(json.dumps(entry) + '\n')

    config = {'paths2audio_files': [audio_file], 'batch_size': 4, 'temp_dir': tmpdir}
    temporary_datalayer = setup_transcribe_dataloader(config, labels)
    for test_batch in temporary_datalayer:
        processed_signal, processed_signal_len = preprocessor(
            input_signal=test_batch[0], length=test_batch[1]
        )
        ort_inputs = {
            ort_session.get_inputs()[0].name: to_numpy(processed_signal), 
            ort_session.get_inputs()[1].name: to_numpy(processed_signal_len)
        }
        ologits = ort_session.run(None, ort_inputs)
        alogits = np.asarray(ologits)
        logits = torch.from_numpy(alogits[0])
        greedy_predictions = logits.argmax(dim=-1, keepdim=False)
        wer = WER(decoding=decoding, use_cer=False)
        hypotheses, _ = wer.decoding.ctc_decoder_predictions_tensor(greedy_predictions)
        hypotheses = [hypothesis.replace("▁", " ")[1:] for hypothesis in hypotheses]
        print(hypotheses)
        break