In [6]:
import torch as t
import torchaudio
import soundfile as sf
import pandas as pd
import json
import jiwer
import vosk
import IPython.display as ipd
from IPython.display import Audio
from auxiva import bssSepation 

In [7]:
grammar = '["кузя", "включи свет", "включи розетку", "включи торшер", "включи ночник", "выключи свет", "выключи розетку", "выключи торшер", "выключи ночник", "в спальне", "в туалете", "на кухне", "в коридоре", "в зале", "в детской", "[unk]"]'

if not 'model' in globals():
   model = vosk.Model(model_name="vosk-model-small-ru-0.22")
   vosk.SetLogLevel(-1)

def calc_wer(title, frames, transcripts, sample_rate, frame_size):
    rec = vosk.KaldiRecognizer(model, sample_rate, grammar)
    rec.SetMaxAlternatives(5)
    def recognize(waveform):
        rec.AcceptWaveform((waveform / waveform.abs().max() * 2**15).short().numpy().tobytes())
        results = rec.FinalResult()
        rec.Reset()
        return [res["text"] for res in json.loads(results)["alternatives"]]

    frame_count = 3 * sample_rate // frame_size + 1
    transcript_times = [(int(time * sample_rate // frame_size), transcript) for time, transcript in transcripts] 
    for k, (i, transcript) in enumerate(transcript_times):
        end = min(i + frame_count, transcript_times[k + 1][0]) if k + 1 < len(transcript_times) else i + frame_count 
        waveforms = t.concatenate(frames[i: end]).T 
        
        results = [(res, channel) for channel, waveform in enumerate(waveforms) for res in recognize(waveform)] 
        metrics, channel = min(((jiwer.process_words(transcript, result), channel) for result, channel in results), key=lambda x: x[0].wer)
        
        yield {
            "t0": i * frame_size / sample_rate,
            "t1": end * frame_size / sample_rate,
            "transcript": transcript,
            "words": len(metrics.references[0]),
            f"{title} result": ' '.join(metrics.hypotheses[0]),
            f"{title} wer": int(metrics.wer * len(metrics.references[0])),
            f"{title} wer, %": metrics.wer,
            f"{title} channel": channel,
            f"{title} audio": ipd.Audio(waveforms[channel], rate=sample_rate, embed=True)._repr_html_()
        }

In [8]:
device = "cuda"
print(f"Используемое устройство: {device}")

Используемое устройство: cuda


In [9]:
win_size = 2048
hop = win_size // 4
hann_win = t.kaiser_window(win_size).to(device)
n_iter = 30
bss_gauss = bssSepation(win_size, hop, hann_win, n_iter, "gauss").to(device)
bss_laplace = bssSepation(win_size, hop, hann_win, n_iter, "laplace").to(device)


In [None]:
with open('./tests/transcripts2.json', 'r', encoding='utf-8') as f:
    transcripts = {name: [(float(time), text) for time, text in data.items()] for name, data in json.load(f).items()}

filenames = transcripts.keys()
summary = []
for filename in filenames:
    waveform_mix, sample_rate = torchaudio.load('./tests/' + filename)
    waveform_mix = waveform_mix.to(device)
    print(filename)

    bss_mix_gauss = bss_gauss(waveform_mix)
    bss_mix_laplace = bss_laplace(waveform_mix)
    
    frame_size = 1024

    frames = t.split(waveform_mix.T.cpu(), frame_size)
    bss_frames_gauss = t.split(bss_mix_gauss.T.cpu(), frame_size)
    bss_frames_laplace = t.split(bss_mix_laplace.T.cpu(), frame_size)

    origin_data = calc_wer("origin", frames, transcripts[filename], sample_rate, frame_size)
    bss_data_gauss = calc_wer("bss gauss", bss_frames_gauss, transcripts[filename], sample_rate, frame_size)
    bss_data_laplace = calc_wer("bss laplace", bss_frames_laplace, transcripts[filename], sample_rate, frame_size)
    
    data = map(lambda x: x[0] | x[1] | x[2], zip(origin_data, bss_data_gauss, bss_data_laplace))

    df = pd.DataFrame(data)
    sum = df.agg(["sum"])
    sum.loc['sum', [name for name in df.columns if 'wer' not in name and 'word' not in name]] = None
    sum.loc['sum', 'origin wer, %']      = sum.loc['sum', 'origin wer']      / sum.loc['sum', 'words']
    sum.loc['sum', 'bss gauss wer, %']          = sum.loc['sum', 'bss gauss wer']         / sum.loc['sum', 'words']
    sum.loc['sum', 'bss laplace wer, %']        = sum.loc['sum', 'bss laplace wer']         / sum.loc['sum', 'words']
    summary.append({'filename': filename, 
                    'origin wer, %': sum.loc['sum', 'origin wer, %'], 
                    'bss gauss wer, %': sum.loc['sum', 'bss gauss wer, %'],
                    'bss laplace wer, %': sum.loc['sum', 'bss laplace wer, %'],
                    })
    display(df.style.concat(sum.style.format("{:,.2%}", subset=[name for name in df.columns if '%' in name]))
                    .set_caption(f'{filename} ({sample_rate//1000} kHz)')
                    .format(precision=3)
                    .format("{:,.0%}", subset=[name for name in df.columns if '%' in name])
                    .hide(['words', 'origin wer', 'bss gauss wer', 'bss laplace wer'], axis=1))

In [11]:
df = pd.DataFrame(summary)
display(df.style.format("{:,.2%}", subset=[name for name in df.columns if '%' in name]))

Unnamed: 0,filename,"origin wer, %","bss gauss wer, %","bss laplace wer, %"
0,24.01.17/exp1.wav,5.00%,3.75%,11.25%
1,24.01.17/exp2.1.wav,5.00%,6.25%,6.25%
2,24.01.17/exp2.2.wav,3.75%,2.50%,8.75%
3,24.01.17/exp2.3.wav,25.00%,7.14%,17.86%
4,24.01.17/exp3.1.wav,14.46%,9.64%,12.05%
5,24.01.17/exp3.2.wav,29.27%,15.85%,19.51%
6,24.01.17/exp3.3.wav,40.00%,12.50%,25.00%
7,24.01.17/exp4.1.wav,20.48%,8.43%,15.66%
8,24.01.17/exp4.2.wav,42.50%,6.25%,18.75%
9,24.01.17/exp4.3.wav,55.00%,15.00%,22.50%
