In [1]:
from loadfile import *
from phonemes import *
from preprocess import *
from evaluate import *
from google_kb_z_speaker.main import *
from tqdm.auto import tqdm
from experiment_helpers import *

2022-03-09 08:54:52,771.771 DEBUG __init__:  matplotlib data path: /home/nsmy/miniconda3/envs/speechTech/lib/python3.8/site-packages/matplotlib/mpl-data
2022-03-09 08:54:52,774.774 DEBUG __init__:  CONFIGDIR=/home/nsmy/.config/matplotlib
2022-03-09 08:54:52,775.775 DEBUG __init__:  interactive is False
2022-03-09 08:54:52,776.776 DEBUG __init__:  platform is linux
2022-03-09 08:54:52,846.846 DEBUG __init__:  CACHEDIR=/home/nsmy/.cache/matplotlib
2022-03-09 08:54:52,848.848 DEBUG font_manager:  Using fontManager instance from /home/nsmy/.cache/matplotlib/fontlist-v330.json
2022-03-09 08:54:53,120.120 DEBUG pyplot:  Loaded backend module://matplotlib_inline.backend_inline version unknown.
2022-03-09 08:54:53,122.122 DEBUG pyplot:  Loaded backend module://matplotlib_inline.backend_inline version unknown.


# Text processing

In [2]:
# Speaker
speakers = ["z21", "z20", "z28", "z29"]
models = ["google", "correct", "kb"]
models2fnameendings = {
    "google": "googleasr",
    "correct": "correct",
    "kb": "kb"
}
use_new = False

# Create filenames model -> List[List[str]] (List[str] are the lines from one filename
fnames = {model: [f"transcriptions{'_new' if use_new else ''}/{sp}.{models2fnameendings[model]}" for sp in speakers]
          for model in models}

bunches = {model: [get_fname_lines(fname) for fname in fnames] for model, fnames in fnames.items()}

# speakerLines : List[str]
# bunch List[speakerLines]
# bunches Dict[model, List[speakerLines]]

# Create a list [Dict[model,speakerlines] for speaker]

list_of_model2speaker_lines = [{model: bunch[i] for model, bunch in bunches.items()} for i in range(len(speakers))]
[fix_lines(x) for x in list_of_model2speaker_lines]
# recreate bunches Dict[model, List[speakerLines]]
bunches = {x: [list_of_model2speaker_lines[i][x] for i in range(len(speakers))] for x in models}
# Reduce bunches to bunches_reduced Dict[model, speakerLines_aggr]
bunches = {k: reduce(lambda x, y: x + y, v) for k, v in bunches.items()}
# Extract the transcriptions
bunches = {k: transcriptions(v) for k, v in bunches.items()}
# Preprocess  each transcription
bunches = {k: [preprocess_text(x) for x in v] for k, v in bunches.items()}

# Initialize phonemizer
phonemizer = init_phonemizer("cuda", "./models/deep-phonemizer-se.pt")

In [3]:
from IPython.core.debugger import set_trace

In [4]:
def singular_phonemes(txt : str):
    """
    Creates a list of singular phonemes from the
    output of get_swedish_phonemes
    """
    return txt.replace("_", " ").split()

def singular_phonemes_preprocess(bunches):
    return {k: [" ".join(singular_phonemes(s)) for s in v] for k,v in bunches.items()}

def get_swedish_phonemes_z(bunches, phonemizer,stress_marks=True):
    # Phonemizer creates a dict of words2phonemes for each line, so it 
    # so all phonemes should be created with a single call for consistency
    # Impose an order to models
    models = list(bunches.keys())
    num_lines = len(bunches["google"])
    concatenated = reduce(lambda x,y : x+y,[bunches[x] for x in models])
    phoneme_lines = get_swedish_phonemes(concatenated, phonemizer, include_stress_marks=stress_marks)
    return { model : phoneme_lines[num_lines * i:num_lines * (i+1)] for model,i in zip(models,range(0,len(models)))}

# Experiments

In [178]:
def preprocess(bunches, _filter, phoneme_words, singular_phonemes, preprocess_hook, stress_marks):
    if phoneme_words:
        bunches = get_swedish_phonemes_z(bunches, phonemizer, stress_marks=stress_marks)
        #
        bunches = {k : [preprocess_phonemes(x) for x in v] for k,v in bunches.items()}
              
    if singular_phonemes:
        bunches = get_swedish_phonemes_z(bunches, phonemizer, stress_marks=stress_marks)
        bunches = singular_phonemes_preprocess(bunches)
    

    if preprocess_hook is not None:
        preprocess_hook(**locals())

    if _filter is not None:
        if len(_filter) == 1 and "agreement" in _filter:
            bunches = filter_bunches_only_on_agreement(bunches, _filter["agreement"])
        else:
            bunches = filter_bunches(bunches,**_filter)
        
    return {
        "bunches" : bunches
    }

def google_kb_wer(bunches):
    return {
        "google_wer" : wer(bunches["correct"], bunches["google"]),
        "kb_wer" : wer(bunches["correct"], bunches["kb"]),
    }
def sentence_lengths(bunches):
    return {k+"-avg-length" : mean([len(x.split(" ")) for x in v]) for k,v in bunches.items()}

def eoi(bunches):
    kei = 0
    oei = 0
    gei = 0
    tei = 0

    for c_l, g_l, kb_l in zip(bunches["correct"], bunches["google"], bunches["kb"]):
        g_s = set(error_idxs(c_l, g_l))
        k_s = set(error_idxs(c_l, kb_l))

        gei += len(g_s)
        kei += len(k_s)
        oei += len(g_s & k_s)
        tei += len(g_s | k_s)

    error_index_overlap = oei / tei
    
    return {"error_index_overlap": error_index_overlap}

def lcs_percentage(bunches):
    lcses = []
    
    for c_l, g_l, kb_l in zip(bunches["correct"], bunches["google"], bunches["kb"]):
        total_length_before = len(g_l.split(" ")) + len(kb_l.split(" "))
        words_set = set(g_l.split(" ") + kb_l.split(" "))
        w2char = { x : chr(i) for i,x in enumerate(words_set)}
        g_l_enc = "".join([w2char[w] for w in g_l.split(" ")])
        kb_l_enc = "".join([w2char[w] for w in kb_l.split(" ")])
        assert total_length_before == (len(g_l_enc) + len(kb_l_enc))
        
        s = SequenceMatcher(None, g_l_enc, kb_l_enc)
        lcs = ''.join([g_l_enc[block.a:(block.a + block.size)] for block in s.get_matching_blocks()])
        lcses.append(len(lcs)/ len(c_l.split(" ")))
    
    return {"lcs-mean": mean(lcses)}
          
def agreement_percentages(bunches): 
   
    agreement, g_correct_kb_not, kb_correct_g_not, agreement_not_correct, agreement_correct,\
    both_incorrect_disagreement =\
        percentage_of_agreement(bunches)
    
    return {  
        "agreement" : agreement,
        "g_correct_kb_not " : g_correct_kb_not,
        "kb_correct_g_not " : kb_correct_g_not,
        "agreement_not_correct" :  agreement_not_correct,
        "both_incorrect_disagreement" : both_incorrect_disagreement,
    }

def meval(bunches, _filter=None, phoneme_words=None, singular_phonemes=None, preprocess_hook=None, stress_marks=None):
    bunches = preprocess(bunches, _filter, phoneme_words, singular_phonemes, preprocess_hook, stress_marks)["bunches"]
    res =  {**google_kb_wer(bunches), **sentence_lengths(bunches), **lcs_percentage(bunches)}
    if _filter is None:
        return {**evaluate_lines(bunches), **res, **eoi(bunches)}
    else:
        return {**res}
    

In [187]:
from typing import Dict, List
a = Dict[str,List]

In [189]:
b = List[a]

In [179]:
lines = []
def locate_empty_lines(**kwargs):
    print(kwargs["google_lines"])
        

In [185]:
experiment = meval
_filter = {
    "agreement" : False,
    "g_correct" : False,
    "kb_correct" : True,
}
kwargs = {
    "_filter": _filter, 
    "phoneme_words" : True,
    "singular_phonemes" : False,
    "preprocess_hook" : None,
    "stress_marks" : False
}
res = experiment_repeats(experiment, 5, bunches, **kwargs)
print_experiment_report(res)

  0%|          | 0/5 [00:00<?, ?it/s]

google_wer: 0.24423976608187134±0.00045767473223679223
kb_wer: 0.0±0.0
correct-avg-length: 7.975±0.013975424859373685
google-avg-length: 7.419649122807018±0.006668974669736086
kb-avg-length: 7.975±0.013975424859373685
lcs-mean: 0.7063615546400865±0.00043987800237852223


In [170]:
from difflib import SequenceMatcher

str_a = "asdasf"
str_b = "aaasBCDaeFGhsdgadijKaadMn"
s = SequenceMatcher(None, str_a, str_b)

lcs = ''.join([str_a[block.a:(block.a + block.size)] for block in s.get_matching_blocks()])
# lcs = 'BCDFGKLM'

In [153]:
str_a

'asdasf'

In [154]:
s.get_matching_blocks()

[Match(a=0, b=2, size=2),
 Match(a=2, b=13, size=1),
 Match(a=3, b=15, size=1),
 Match(a=6, b=25, size=0)]

In [44]:
google_lines.index("30")

NameError: name 'google_lines' is not defined

In [None]:
set(phonemizer(["jag" for x in range(10000)], "se"))

In [None]:
set(get_swedish_phonemes(["ja" for x in range(10000)], phonemizer))

In [None]:
s = []
for x in tqdm(range(1000)):
    torch.manual_seed(0)
    s.append(phonemizer('30', "se"))
set(s)

In [None]:
s = []
for x in tqdm(range(15)):
    #torch.manual_seed(0)
    s.append(get_swedish_phonemes('ja', phonemizer))
set(s)

In [154]:
idx = 43
size = 10
def get_triad(idx, bunch):
    #### Google, correct, Kb
    return bunch[0][idx], bunch[1][idx], bunch[2][idx]

def plot_triad(triad,ax):
    ax.text(0.0, 0.0, triad[0], size=size, rotation=0,
             ha="center", va="center",
             bbox=dict(boxstyle="round",
                       ec=(1., 0.5, 0.5),
                       fc=(1., 0.8, 0.8),
                       )
             )

    ax.text(0, -2.5,  triad[2], size=size, rotation=0,
             ha="center", va="center",
             bbox=dict(boxstyle="round",
                       ec=(153/255, 51/255, 0/255),
                       fc=(255/255, 153/255, 102/255),
                       )
             )
    ax.text(0, -5,  triad[1], size=size, rotation=0,
             ha="center", va="center",
             bbox=dict(boxstyle="round",
                       ec=(42 / 255, 162 / 255, 42 / 255),
                       fc=(133 / 255, 224 / 255, 133 / 255),
                       )
             )
    
    ax.set_ylim(-10, 10)
    ax.set_xlim(-10, 10)

SyntaxError: invalid syntax (3878495266.py, line 2)

In [None]:
fig, axs = plt.subplots(1,2, figsize=(15, 5))
idx = 0
plot_triad(get_triad(idx, text_data_bunch) ,axs[0])
plot_triad(get_triad(idx, phoneme_data_bunch) ,axs[1])

In [None]:

def print_triad_correctness(triad):
    #### Google, correct, Kb
    print("Google correct ", triad[1] == triad[0])
    print("KB correct ", triad[1] == triad[2])
    print("Agreement ", triad[0] == triad[2])
    
    print("Google WER ", wer(triad[1], triad[0]))
    print("KB WER ", wer(triad[1], triad[2]))