In [1]:
from whisper_module import whisper
import csv
import os
import sys
from pathlib import Path
import pdb
from datetime import datetime
import time
from datetime import datetime
import warnings
warnings.filterwarnings("ignore")

import pathlib
import torch
from tqdm import tqdm
import ffmpeg
import datetime


In [2]:
basedir = os.getcwd()
ffmpeg_path = os.path.join(basedir,"ffmpeg","ffmpeg.exe")
AUDIO_DIR = os.path.join(basedir, "data", "input")
full_audio_path = 'C:/Users/k66gu/Documents/transcription_dev/data/input/delta_10.wav '# os.path.join(basedir, "data","input","delta_10.wav")
VAD_DIR = os.path.join(AUDIO_DIR,"vad_chunks")
SILERO_DIR = os.path.join(basedir, "models", "silero-vad")
print(basedir,'\n',ffmpeg_path,"\n",full_audio_path,
      "\n", AUDIO_DIR,"\n", VAD_DIR, "\n", SILERO_DIR)

C:\Users\k66gu\Documents\transcription_dev 
 C:\Users\k66gu\Documents\transcription_dev\ffmpeg\ffmpeg.exe 
 C:/Users/k66gu/Documents/transcription_dev/data/input/delta_10.wav  
 C:\Users\k66gu\Documents\transcription_dev\data\input 
 C:\Users\k66gu\Documents\transcription_dev\data\input\vad_chunks 
 C:\Users\k66gu\Documents\transcription_dev\models\silero-vad


In [3]:
class Transcriber(object):
    """
    This class implements a new VAD ontop of Whisper Transcriber
    """
    def __init__(self, *args, **kwargs):
        self.VAD_THRESHOLD = 0.4 # Confidence threshold for VAD speech vs non-speech
        self.VAD_SR = 16000 # Sample rate to resample to
        self.head = 3200 # 0.2s head for padding chunks
        self.tail = 20800 # 1.32 tail for padding chunks
        self.chunk_threshold = 3.0 

        # Pass in directories by unpacking a list
        # List order should be in row order below
        self.SILERO_DIR = args[0] # This should be passed in from os.path.dirname(sys.argv[0]) or os.getcwd() if dev
        self.full_audio_path = args[1] # This will get passed in from a button or another class attribute
        self.AUDIO_DIR = args[2]

    def _vad_dir_creator(self):
        """
        Create directory for VAD chunks if not existing already
        """
        AUDIO_DIR = self.AUDIO_DIR
        if not os.path.exists(os.path.join(AUDIO_DIR,"vad_chunks")):
            print("Creating vad chunks directory...")
            os.mkdir(os.path.join(AUDIO_DIR,"vad_chunks"))
        
        # Store directory as we need somewhere to look for later on
        self.VAD_DIR = os.path.join(AUDIO_DIR, "vad_chunks")
        print("Directory created!")
        
    def _create_temp_audio(self, ffmpeg_path: str):
        """
        Create a copy of the input audio from full_audio_path and write to a temp directory VAD_DIR
        
        Args:
            ffmpeg_path (str): path to ffmpeg executable
        """
        full_audio_path = self.full_audio_path
        VAD_DIR = self.VAD_DIR
        # Hardcode values to feed into ffmpeg
        # This is just a temporary copy which requires specific formatted values to ensure
        # that we get the appropriate input to the system
        ffmpeg.input(full_audio_path).output(
            VAD_DIR+"/vad_temp.wav",
            ar="16000",
            ac="1",
            acodec="pcm_s16le",
            map_metadata="-1",
            fflags="+bitexact",
        ).overwrite_output().run(cmd=[ffmpeg_path, "-nostdin"], capture_stdout=True, capture_stderr=True, quiet=True)
        print(os.path.exists(os.path.join(VAD_DIR,"vad_temp.wav")))
    
    def _load_vad(self):
        """
        Load the VAD model from local
        Store utils from Silero as methods within the transcriber class
        """
        vad_model, utils = torch.hub._load_local(
            hubconf_dir=self.SILERO_DIR, model="silero_vad_local", onnx=False
        )
        if (vad_model is not None) and (utils is not None):
            print("Model and utils loaded!")
        print(utils)

        # Store the functions within the transcriber object for easy access
        (self.get_speech_timestamps, self.save_audio, self.read_audio, 
         self.load_audio, self.VADIterator, self.collect_chunks) = utils

        self.vad_model = vad_model 
    
    def _read_audio_old(self):
        """
        Read audio into an attribute to use
        The path here will be hardcoded as everything should be self contained
        The self-containment is from the directory and file copying from
        _vad_dir_creator() and _create_temp_audio() which if properly run
        will result in absolute file locations    
        """
        self.wav_old = self.read_audio("data/input/vad_chunks/vad_temp.wav", sampling_rate=self.VAD_SR)
        print("Audio loaded! at %s" % ("data/input/vad_chunks/vad_temp.wav"))
        
    def _read_audio(self, ffmpeg_path:str):
            """
            Read audio into an attribute to use
            The path here will be hardcoded as everything should be self contained
            The self-containment is from the directory and file copying from
            _vad_dir_creator() and _create_temp_audio() which if properly run
            will result in absolute file locations    

            Args:
                ffmpeg_path (str): path to ffmpeg executable
            """
            # Hardcode the location of the copied temporary wav
            # This is read in as a np_buffer which we convert to a tensor using torch
            audio = "data/input/vad_chunks/vad_temp.wav"
            if not torch.is_tensor(audio):
                if isinstance(audio, str):
                    audio = self.load_audio(audio, ffmpeg_path,sr=self.VAD_SR)
                audio = torch.from_numpy(audio)
            self.wav = audio
            print("Audio loaded! at %s" % ("data/input/vad_chunks/vad_temp.wav"))
    
    
    def _process_timestamps(self):
        """
        Add padding, remove small gaps and overlaps from processed audio
        Result timestamps are in samples (not seconds) 
        They represent chunks of audio
        """
        t = self.get_speech_timestamps(self.wav, self.vad_model, sampling_rate=self.VAD_SR, threshold=self.VAD_THRESHOLD)    
        # Add a bit of padding, and remove small gaps
        for i in range(len(t)):
            t[i]["start"] = max(0, t[i]["start"] - self.head)  # 0.2s head -> self.head=3200
            t[i]["end"] = min(self.wav.shape[0] - 16, t[i]["end"] + self.tail)  # 1.3s tail -> self.tail = 20800
            if i > 0 and t[i]["start"] < t[i - 1]["end"]:
                t[i]["start"] = t[i - 1]["end"]  # Remove overlap
        self.timestamps = t # Store timestamps to edit
        
        print("Timestamps processed! No. of audio chunks are: {}".format(len(self.timestamps)))
        
    def _split_audio(self): 
        """
        If breaks are longer than chunk_threshold seconds, 
            split into a new audio file
        This'll effectively turn long transcriptions into many shorter ones
        
        """
        # Store the chunked audio into a matrix
        # Each row is each chunk
        u = [[]]
        for i in range(len(self.timestamps)):
            if i > 0 and self.timestamps[i]["start"] > self.timestamps[i - 1]["end"] + (self.chunk_threshold * self.VAD_SR):
                u.append([])
            u[-1].append(self.timestamps[i])
        self.chunked_audio = u # Store the matrix of chunked audio
        
        print("No. of chunked audio based on threshold: {}".format(len(self.chunked_audio)))
    
    def _merge_chunks(self):
        """
        Merge chunks and remove temp copy of original audio
        Save chunks to a directory

        """
        for i in range(len(self.chunked_audio)):
            self.save_audio(
                "data/input/vad_chunks/" + str(i) + ".wav", # Fix the hardcoded locations for path
                self.collect_chunks(self.chunked_audio[i], self.wav),
                sampling_rate=self.VAD_SR,
            )
        if len(os.listdir("data/input/vad_chunks")) != 0:
            print("Audio chunks saved!")
        os.remove("data/input/vad_chunks/vad_temp.wav") # Fix hardcoded paths
    
    def _convert_timestamps_seconds(self):
        """
        Convert timestamps into seconds format
        """
        # Go through each individual chunked audio within the matrix
        # Identify chunks and offsets for the audio through math and using 16000 SR
        # keys: start, end, chunk_start and chunk_end should be present if previous executions worked
        for i in range(len(self.chunked_audio)):
            time = 0.0
            offset = 0.0
            for j in range(len(self.chunked_audio[i])):
                self.chunked_audio[i][j]["start"] /= self.VAD_SR
                self.chunked_audio[i][j]["end"] /= self.VAD_SR
                self.chunked_audio[i][j]["chunk_start"] = time
                time += self.chunked_audio[i][j]["end"] - self.chunked_audio[i][j]["start"]
                self.chunked_audio[i][j]["chunk_end"] = time
                if j == 0:
                    offset += self.chunked_audio[i][j]["start"]
                else:
                    offset += self.chunked_audio[i][j]["start"] - self.chunked_audio[i][j - 1]["end"]
                self.chunked_audio[i][j]["offset"] = offset
        print("Timestamps converted!")
    
    def _whisper_on_chunks(self, ffmpeg_path:str, basedir: str, model_name:str = 'medium'):
        """
        Transcribe using whisper on the pre-processed chunked audio
        Whisper using the medium model

        Args:
            ffmpeg_path (str): path to ffmpeg executable e.g., os.path.join(basedir,"ffmpeg","ffmpeg.exe")
            basedir (str): Path where executable should be located e.g., os.getcwd() 
            model_name (str): Default "medium" for Whisper model size
        """
        # Load the whisper model
        model = whisper.load_model_local(f"{model_name}.en", basedir ,in_memory=True)
        task = 'transcribe'
        language = 'english'
        initial_prompt = ''

        # Transcribe each chunk using Whisper
        for i in tqdm(range(len(self.chunked_audio))):
            result = model.transcribe(
                os.path.join(self.VAD_DIR,str(i) + ".wav" ), ffmpeg_path=ffmpeg_path, 
                task=task, language=language, initial_prompt=initial_prompt
            )
            # Break if result doesn't end with severe hallucinations
            if len(result["segments"]) == 0:
                break
            elif result["segments"][-1]["end"] < self.chunked_audio[i][-1]["chunk_end"] + 10.0:
                break
        self.result = result
        if len(result) > 0:
            print("Audio successfully transcribed!")
    
    def _run(self, ffmpeg_path:str, basedir: str):
        """
        This method executes all the required steps required for Whisper
        """
        self._vad_dir_creator()
        self._create_temp_audio()
        self._load_vad()
        self._read_audio()
        self._process_timestamps()
        self._split_audio()
        self._merge_chunks()
        self._convert_timestamps_seconds()
        self._whisper_on_chunks(ffmpeg_path = ffmpeg_path, basedir = basedir)
            
    def whisper_csv(self, ffmpeg_path:str, basedir: str):
        """
        Seperate whisper results into csv only
        """
        self._run(ffmpeg_path = ffmpeg_path, basedir = basedir) # Execute everything needed
        segment_info = []
        result = self.result
        
        for i in tqdm(range(len(self.chunked_audio))):
            for r in result["segments"]:
                # Skip audio timestamped after the chunk has ended
                if r["start"] > self.chunked_audio[i][-1]["chunk_end"]:
                    continue
                # Keep segment info for debugging
                # segment_info.append(r)

                
                # Skip if log prob is low or no speech prob is high
                if r["avg_logprob"] < -1.0 or r["no_speech_prob"] > 0.7: # Hardcoded thresholds
                    continue
                # Set start timestamp
                start = r["start"] + self.chunked_audio[i][0]["offset"]
                for j in range(len(self.chunked_audio[i])):
                    if (
                        r["start"] >= self.chunked_audio[i][j]["chunk_start"]
                        and r["start"] <= self.chunked_audio[i][j]["chunk_end"]
                    ):
                        start = r["start"] + self.chunked_audio[i][j]["offset"]
                        break
                end = self.chunked_audio[i][-1]["end"] + 0.5
                for j in range(len(self.chunked_audio[i])):
                    if r["end"] >= self.chunked_audio[i][j]["chunk_start"] and r["end"] <= self.chunked_audio[i][j]["chunk_end"]:
                        end = r["end"] + self.chunked_audio[i][j]["offset"]
                        break
                segment_info.append(r)
        return segment_info
    
    def whisper_text(self, ffmpeg_path: str, basedir: str):
        """
        Separate the whisper results into text for output
        """
        self._run(ffmpeg_path = ffmpeg_path, basedir = basedir)
        text_info = []
        result = self.result

        # Post process the segments and text based using the offsets and 
        # identified chunk start and end to ensure we can remove overlaps
        for i in tqdm(range(len(self.chunked_audio))):
            for r in result["segments"]:
                # Skip audio timestamped after the chunk has ended
                if r["start"] > self.chunked_audio[i][-1]["chunk_end"]:
                    continue
                # Skip if log prob is low or no speech prob is high
                if r["avg_logprob"] < -1.0 or r["no_speech_prob"] > 0.7: # Hardcoded thresholds
                    continue
                # Set start timestamp
                start = r["start"] + self.chunked_audio[i][0]["offset"]
                for j in range(len(self.chunked_audio[i])):
                    if (
                        r["start"] >= self.chunked_audio[i][j]["chunk_start"]
                        and r["start"] <= self.chunked_audio[i][j]["chunk_end"]
                    ):
                        start = r["start"] + self.chunked_audio[i][j]["offset"]
                        break
                end = self.chunked_audio[i][-1]["end"] + 0.5
                for j in range(len(self.chunked_audio[i])):
                    if r["end"] >= self.chunked_audio[i][j]["chunk_start"] and r["end"] <= self.chunked_audio[i][j]["chunk_end"]:
                        end = r["end"] + self.chunked_audio[i][j]["offset"]
                        break
                
                text_info.append(r["text"].strip())
        return text_info

# Unit & Integration testing
* Unit testing row by row to ensure it works
* Integration testing row by row as each subsequent method requires the previous row to be executed

In [4]:
transcriber = Transcriber(*[SILERO_DIR, full_audio_path, AUDIO_DIR])

In [5]:
transcriber._vad_dir_creator()

Directory created!


In [6]:
transcriber._create_temp_audio(ffmpeg_path)

True


In [7]:
transcriber._load_vad()

Model and utils loaded!
(<function get_speech_timestamps at 0x000002D23806FA60>, <function save_audio at 0x000002D23806F8B0>, <function read_audio at 0x000002D237D23DC0>, <function load_audio at 0x000002D23806F820>, <class 'utils_vad.VADIterator'>, <function collect_chunks at 0x000002D23806FCA0>)


#### Compare Torchaudio load vs FFMPEG load
* wav_old should equal wav

In [8]:
transcriber._read_audio_old()
print("wav file tensor: {}".format(transcriber.wav_old[:5]))

Audio loaded! at data/input/vad_chunks/vad_temp.wav
wav file tensor: tensor([-2.1362e-04, -2.1362e-04, -3.0518e-05,  2.4414e-04,  2.4414e-04])


In [9]:
transcriber._read_audio(ffmpeg_path)
print("wav file tensor: {}".format(transcriber.wav[:5]))

Audio loaded! at data/input/vad_chunks/vad_temp.wav
wav file tensor: tensor([-2.1362e-04, -2.1362e-04, -3.0518e-05,  2.4414e-04,  2.4414e-04])


In [13]:
# Check if our inputs are the same
if all(torch.eq(transcriber.wav, transcriber.wav_old)):
    print("FFMPEG method and torchaudio method result in same input audio tensor!")

FFMPEG method and torchaudio method result in same input audio tensor!


In [14]:
transcriber._process_timestamps()
print("Timestamps: {}".format(transcriber.timestamps))

Timestamps processed! No. of audio chunks are: 3
Timestamps: [{'start': 5024, 'end': 56096}, {'start': 56096, 'end': 135456}, {'start': 135456, 'end': 159984}]


In [15]:
transcriber._split_audio()
print("Audio chunks: {}".format(transcriber.chunked_audio))

No. of chunked audio based on threshold: 1
Audio chunks: [[{'start': 5024, 'end': 56096}, {'start': 56096, 'end': 135456}, {'start': 135456, 'end': 159984}]]


#### Compare FFMPEG save vs Torchaudio Save

In [17]:
save_wav = transcriber.chunked_audio
save_wav

[[{'start': 5024, 'end': 56096},
  {'start': 56096, 'end': 135456},
  {'start': 135456, 'end': 159984}]]

In [22]:
save_wav_2 = transcriber.collect_chunks(save_wav[0], transcriber.wav)
save_wav_2

tensor([ 9.1553e-05, -9.1553e-05, -1.2207e-04,  ..., -4.2114e-03,
        -6.1035e-04,  5.0659e-03])

In [23]:
save_wav_2.unsqueeze(0)

tensor([[ 9.1553e-05, -9.1553e-05, -1.2207e-04,  ..., -4.2114e-03,
         -6.1035e-04,  5.0659e-03]])

In [33]:
save_wav_2.dtype

torch.float32

In [36]:
save_wav_2.shape

torch.Size([154960])

In [56]:
import soundfile

In [63]:
def _get_subtype(dtype: torch.dtype, format: str, encoding: str, bits_per_sample: int):
    if format == "wav":
        return _get_subtype_for_wav(dtype, encoding, bits_per_sample)
    if format == "flac":
        if encoding:
            raise ValueError("flac does not support encoding.")
        if not bits_per_sample:
            return "PCM_16"
        if bits_per_sample > 24:
            raise ValueError("flac does not support bits_per_sample > 24.")
        return "PCM_S8" if bits_per_sample == 8 else f"PCM_{bits_per_sample}"
    if format in ("ogg", "vorbis"):
        if encoding or bits_per_sample:
            raise ValueError("ogg/vorbis does not support encoding/bits_per_sample.")
        return "VORBIS"
    if format == "sph":
        return _get_subtype_for_sphere(encoding, bits_per_sample)
    if format in ("nis", "nist"):
        return "PCM_16"
    raise ValueError(f"Unsupported format: {format}")

def _get_subtype_for_wav(dtype: torch.dtype, encoding: str, bits_per_sample: int):
    if not encoding:
        if not bits_per_sample:
            subtype = {
                torch.uint8: "PCM_U8",
                torch.int16: "PCM_16",
                torch.int32: "PCM_32",
                torch.float32: "FLOAT",
                torch.float64: "DOUBLE",
            }.get(dtype)
            if not subtype:
                raise ValueError(f"Unsupported dtype for wav: {dtype}")
            return subtype
        if bits_per_sample == 8:
            return "PCM_U8"
        return f"PCM_{bits_per_sample}"
    if encoding == "PCM_S":
        if not bits_per_sample:
            return "PCM_32"
        if bits_per_sample == 8:
            raise ValueError("wav does not support 8-bit signed PCM encoding.")
        return f"PCM_{bits_per_sample}"
    if encoding == "PCM_U":
        if bits_per_sample in (None, 8):
            return "PCM_U8"
        raise ValueError("wav only supports 8-bit unsigned PCM encoding.")
    if encoding == "PCM_F":
        if bits_per_sample in (None, 32):
            return "FLOAT"
        if bits_per_sample == 64:
            return "DOUBLE"
        raise ValueError("wav only supports 32/64-bit float PCM encoding.")
    if encoding == "ULAW":
        if bits_per_sample in (None, 8):
            return "ULAW"
        raise ValueError("wav only supports 8-bit mu-law encoding.")
    if encoding == "ALAW":
        if bits_per_sample in (None, 8):
            return "ALAW"
        raise ValueError("wav only supports 8-bit a-law encoding.")
    raise ValueError(f"wav does not support {encoding}.")

In [64]:
def _get_subtype(dtype: torch.dtype, format: str, encoding: str, bits_per_sample: int):
    if format == "wav":
        return _get_subtype_for_wav(dtype, encoding, bits_per_sample)
    if format == "flac":
        if encoding:
            raise ValueError("flac does not support encoding.")
        if not bits_per_sample:
            return "PCM_16"
        if bits_per_sample > 24:
            raise ValueError("flac does not support bits_per_sample > 24.")
        return "PCM_S8" if bits_per_sample == 8 else f"PCM_{bits_per_sample}"
    if format in ("ogg", "vorbis"):
        if encoding or bits_per_sample:
            raise ValueError("ogg/vorbis does not support encoding/bits_per_sample.")
        return "VORBIS"
    if format == "sph":
        return _get_subtype_for_sphere(encoding, bits_per_sample)
    if format in ("nis", "nist"):
        return "PCM_16"
    raise ValueError(f"Unsupported format: {format}")

def _get_subtype_for_wav(dtype: torch.dtype, encoding: str, bits_per_sample: int):
    if not encoding:
        if not bits_per_sample:
            subtype = {
                torch.uint8: "PCM_U8",
                torch.int16: "PCM_16",
                torch.int32: "PCM_32",
                torch.float32: "FLOAT",
                torch.float64: "DOUBLE",
            }.get(dtype)
            if not subtype:
                raise ValueError(f"Unsupported dtype for wav: {dtype}")
            return subtype
        if bits_per_sample == 8:
            return "PCM_U8"
        return f"PCM_{bits_per_sample}"
    if encoding == "PCM_S":
        if not bits_per_sample:
            return "PCM_32"
        if bits_per_sample == 8:
            raise ValueError("wav does not support 8-bit signed PCM encoding.")
        return f"PCM_{bits_per_sample}"
    if encoding == "PCM_U":
        if bits_per_sample in (None, 8):
            return "PCM_U8"
        raise ValueError("wav only supports 8-bit unsigned PCM encoding.")
    if encoding == "PCM_F":
        if bits_per_sample in (None, 32):
            return "FLOAT"
        if bits_per_sample == 64:
            return "DOUBLE"
        raise ValueError("wav only supports 32/64-bit float PCM encoding.")
    if encoding == "ULAW":
        if bits_per_sample in (None, 8):
            return "ULAW"
        raise ValueError("wav only supports 8-bit mu-law encoding.")
    if encoding == "ALAW":
        if bits_per_sample in (None, 8):
            return "ALAW"
        raise ValueError("wav only supports 8-bit a-law encoding.")
    raise ValueError(f"wav does not support {encoding}.")

def save(
    filepath: str,
    src: torch.Tensor,
    sample_rate: int,
    channels_first: bool = True,
    compression: [float] = None,
    format: [str] = None,
    encoding:[str] = None,
    bits_per_sample: [int] = None,
):
    """Save audio data to file.

    Note:
        The formats this function can handle depend on the soundfile installation.
        This function is tested on the following formats;

        * WAV

            * 32-bit floating-point
            * 32-bit signed integer
            * 16-bit signed integer
            * 8-bit unsigned integer

        * FLAC
        * OGG/VORBIS
        * SPHERE

    Note:
        ``filepath`` argument is intentionally annotated as ``str`` only, even though it accepts
        ``pathlib.Path`` object as well. This is for the consistency with ``"sox_io"`` backend,
        which has a restriction on type annotation due to TorchScript compiler compatiblity.

    Args:
        filepath (str or pathlib.Path): Path to audio file.
        src (torch.Tensor): Audio data to save. must be 2D tensor.
        sample_rate (int): sampling rate
        channels_first (bool, optional): If ``True``, the given tensor is interpreted as `[channel, time]`,
            otherwise `[time, channel]`.
        compression (float of None, optional): Not used.
            It is here only for interface compatibility reson with "sox_io" backend.
        format (str or None, optional): Override the audio format.
            When ``filepath`` argument is path-like object, audio format is
            inferred from file extension. If the file extension is missing or
            different, you can specify the correct format with this argument.

            When ``filepath`` argument is file-like object,
            this argument is required.

            Valid values are ``"wav"``, ``"ogg"``, ``"vorbis"``,
            ``"flac"`` and ``"sph"``.
        encoding (str or None, optional): Changes the encoding for supported formats.
            This argument is effective only for supported formats, sush as
            ``"wav"``, ``""flac"`` and ``"sph"``. Valid values are;

                - ``"PCM_S"`` (signed integer Linear PCM)
                - ``"PCM_U"`` (unsigned integer Linear PCM)
                - ``"PCM_F"`` (floating point PCM)
                - ``"ULAW"`` (mu-law)
                - ``"ALAW"`` (a-law)

        bits_per_sample (int or None, optional): Changes the bit depth for the
            supported formats.
            When ``format`` is one of ``"wav"``, ``"flac"`` or ``"sph"``,
            you can change the bit depth.
            Valid values are ``8``, ``16``, ``24``, ``32`` and ``64``.

    Supported formats/encodings/bit depth/compression are:

    ``"wav"``
        - 32-bit floating-point PCM
        - 32-bit signed integer PCM
        - 24-bit signed integer PCM
        - 16-bit signed integer PCM
        - 8-bit unsigned integer PCM
        - 8-bit mu-law
        - 8-bit a-law

        Note:
            Default encoding/bit depth is determined by the dtype of
            the input Tensor.

    ``"flac"``
        - 8-bit
        - 16-bit (default)
        - 24-bit

    ``"ogg"``, ``"vorbis"``
        - Doesn't accept changing configuration.

    ``"sph"``
        - 8-bit signed integer PCM
        - 16-bit signed integer PCM
        - 24-bit signed integer PCM
        - 32-bit signed integer PCM (default)
        - 8-bit mu-law
        - 8-bit a-law
        - 16-bit a-law
        - 24-bit a-law
        - 32-bit a-law

    """
    if src.ndim != 2:
        raise ValueError(f"Expected 2D Tensor, got {src.ndim}D.")
    if compression is not None:
        warnings.warn(
            '`save` function of "soundfile" backend does not support "compression" parameter. '
            "The argument is silently ignored."
        )
    if hasattr(filepath, "write"):
        if format is None:
            raise RuntimeError("`format` is required when saving to file object.")
        ext = format.lower()
    else:
        ext = str(filepath).split(".")[-1].lower()

    if bits_per_sample not in (None, 8, 16, 24, 32, 64):
        raise ValueError("Invalid bits_per_sample.")
    if bits_per_sample == 24:
        warnings.warn(
            "Saving audio with 24 bits per sample might warp samples near -1. "
            "Using 16 bits per sample might be able to avoid this."
        )
    subtype = _get_subtype(src.dtype, ext, encoding, bits_per_sample)

    # sph is a extension used in TED-LIUM but soundfile does not recognize it as NIST format,
    # so we extend the extensions manually here
    if ext in ["nis", "nist", "sph"] and format is None:
        format = "NIST"

    if channels_first:
        src = src.t()

    soundfile.write(file=filepath, data=src, samplerate=sample_rate, subtype=subtype, format=format)

In [65]:
path = "test.wav"
tensor_to_save = save_wav_2
sampling_rate = 16000
save(path, tensor_to_save.unsqueeze(0), sampling_rate, bits_per_sample=16)

In [195]:
transcriber._merge_chunks()

Audio chunks saved!


In [196]:
transcriber._convert_timestamps_seconds()

Timestamps converted!


In [197]:
# basedir = os.getcwd()
# ffmpeg_path = os.path.join(basedir,"ffmpeg","ffmpeg.exe")
transcriber._whisper_on_chunks(ffmpeg_path = ffmpeg_path, basedir = basedir)

  0%|                                                                                            | 0/1 [00:15<?, ?it/s]


Audio successfully transcribed!


In [206]:
transcriber.result['text'].strip()

'Delta 2846, the fire department would like to know are you expecting or would you like an inspection when you get off the runway?'

In [199]:
segments = transcriber._whisper_csv()

100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]


In [200]:
text_only = transcriber._whisper_text()

100%|██████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 1004.62it/s]


In [201]:
segments

[{'id': 0,
  'seek': 0,
  'start': 0.0,
  'end': 9.68,
  'text': ' Delta 2846, the fire department would like to know are you expecting or would you like an inspection when you get off the runway?',
  'tokens': [50363,
   16978,
   2579,
   3510,
   11,
   262,
   2046,
   5011,
   561,
   588,
   284,
   760,
   389,
   345,
   12451,
   393,
   561,
   345,
   588,
   281,
   15210,
   618,
   345,
   651,
   572,
   262,
   23443,
   30,
   50847],
  'temperature': 0.0,
  'avg_logprob': -0.34617840449015297,
  'compression_ratio': 1.2169811320754718,
  'no_speech_prob': 0.05624688044190407}]

In [203]:
text_only

['Delta 2846, the fire department would like to know are you expecting or would you like an inspection when you get off the runway?']

In [221]:
segments_info = transcriber.whisper_csv(ffmpeg_path = ffmpeg_path, basedir = basedir)

Directory created!
True
Model and utils loaded!
Audio loaded! at data/input/vad_chunks/vad_temp.wav
Timestamps processed! No. of audio chunks are: 3
No. of chunked audio based on threshold: 1
Audio chunks saved!
Timestamps converted!


  0%|                                                                                            | 0/1 [00:14<?, ?it/s]


Audio successfully transcribed!


100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]


In [227]:
assert transcriber.result['segments'] == segments_info

In [225]:
text_info = transcriber.whisper_text(ffmpeg_path = ffmpeg_path, basedir = basedir)

Directory created!
True
Model and utils loaded!
Audio loaded! at data/input/vad_chunks/vad_temp.wav
Timestamps processed! No. of audio chunks are: 3
No. of chunked audio based on threshold: 1
Audio chunks saved!
Timestamps converted!


  0%|                                                                                            | 0/1 [00:15<?, ?it/s]


Audio successfully transcribed!


100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]


In [235]:
assert text_info[0] == transcriber.result['text'].strip()