In [None]:
# for auto-reloading rennet modules
%load_ext autoreload
%autoreload 1

# py2.7 compat
from __future__ import division, print_function
from six.moves import zip, range, zip_longest

In [None]:
import os
import sys

rennet_data_root = os.path.join("..", "..", "data")

---
***

> **NOTE:**
>
> This is not the original notebook used in double-talk detection research by me (Abdullah).
> But it is a faithful and more easy to use copy with some modifications, and some parts skipped.
>
> You should still be able to use it to meet the main goals of this notebook.
>
> For anything marked as `[SKIPPED]`, please refer to the following original notebooks in `notebooks/dtfinale/`:
> - `2017-08-03-feats-fe_03_p1m-logmel64-win32ms-hop10ms.ipynb`
> - `2017-08-04-feats-fe_03_p1-logmel64-win32ms-hop10ms.ipynb`

---
***

# Extracting `logmelspectrogram_64` features for `fe_03_p1`

The goal of this notebook is to document how:
- features are extracted for the existing exported splits of the dataset
    + Assumes that the splits directory structures from previous steps
    + Assumes all audio files in all splits are in the standardized audio format (or one that `audioread` can work with):
        * format: `wav`
        * channels: `mono`
        * samplerate: `8000 Hz`
    + Features to be extracted with params:
        * type: `log10-mel-spectrograms`
        * n_mels: `64`
        * window_type_stft: `hanning`
        * window_len_stft: `0.032 sec`
        * hop_len_stft: `0.010 sec`
- labels are extracted for the corresponding time-stamps of the extracted features
    + Assumes that `ActiveSpeakers` can be read for the dataset from label files
        * **and** that all have only two speakers
- chunked storage to hdf5 files of the features and corresponding labels is performed for each split, using dask
    + chunks have overlap _within_ a call
        * chunk_size_axis0: `2**14`
        * chunk_overlap_axis0: `2**10`
- extra infos are added to the exported
    + speaker information for each call's labels added as hdf5 attributes
    + ___raw___ Viterbi priors are calculated and added for each split's hdf5

In [None]:
import warnings
from collections import namedtuple
from itertools import starmap

import librosa as lr
import numpy as np
import dask as d
import dask.array as da
from distributed import Client
import h5py as h

import dask.diagnostics as dg
import time
from bokeh.io import output_notebook

In [None]:
%aimport rennet.datasets.fisher
import rennet.datasets.fisher as fe

%aimport rennet.utils.audio_utils
import rennet.utils.audio_utils as au

%aimport rennet.utils.np_utils
import rennet.utils.np_utils as nu

from rennet.utils.py_utils import recursive_glob, makedirs_with_existok

In [None]:
# dask progress bars and other diagnostics
pb = dg.ProgressBar()
pb.register()

output_notebook()

pr = dg.Profiler()
pr.register()

rpr = dg.ResourceProfiler(dt=0.1)
rpr.register()

## Params 

For feature extraction and related tasks like, output file naming, etc.

In [None]:
# The parameters that will be used for extracting logmelspectrogram
win_sec = 0.032
hop_sec = 0.010
nmels = 64
window = 'hann'
sr = samplerate = 8000
nchannels = 1
chunking = 2**14  # main axis' size for hdf5 chunked storage
chunkovl = 2**10  # overlap between consecutive chunks of the same call, so adding contextual frames doesn't miss frames.

n_fft = win_len = int(win_sec * sr)
hop_len = int(hop_sec * sr)
chunkstep = chunking - chunkovl
melfreq = lr.mel_frequencies(n_mels=nmels, fmax=sr//2)

print('win-len', win_len)
print('hop-len', hop_len)
print()
print('chunking', chunking)
print('chunkovl', chunkovl)
print('chunkstep', chunkstep)
print()
print('mel-frequencies ({})\n'.format(len(melfreq)), melfreq)

## Roots & Paths

### Sources 

Where the pre-split wav and label exports are

In [None]:
# Setting up roots for sources

working_data_dir = os.path.join(rennet_data_root, 'working')

provider = 'fisher'
dataset = 'fe_03_p1'
export_name = 'wav-8k-mono'


# Root path to where the splits are

splits_root = os.path.join(working_data_dir, provider, dataset, export_name)
# check if it exists
if not os.path.exists(splits_root):
    raise RuntimeError("SPLITS_ROOT not found at: \n {}".format(splits_root))
    
print('SPLITS_ROOT', splits_root, '', sep='\n')

# No errors means all okay

In [None]:
# Making glob queries for audio, label and calldata files

export_val_dir = os.path.join(splits_root, 'val')
val_audios_dir = os.path.join(export_val_dir, 'audios', 'data')
val_audios_glob_str = (val_audios_dir, "*.wav")
val_labels_dir = os.path.join(export_val_dir, 'labels', 'data')
val_labels_glob_str = (val_labels_dir, "*.txt")

export_trn_dir = os.path.join(splits_root, 'train')
trn_audios_dir = os.path.join(export_trn_dir, 'audios', 'data')
trn_audios_glob_str = (trn_audios_dir, "*.wav")
trn_labels_dir = os.path.join(export_trn_dir, 'labels', 'data')
trn_labels_glob_str = (trn_labels_dir, "*.txt")

export_tst_dir = os.path.join(splits_root, 'test')
tst_audios_dir = os.path.join(export_tst_dir, 'audios', 'data')
tst_audios_glob_str = (tst_audios_dir, "*.wav")
tst_labels_dir = os.path.join(export_tst_dir, 'labels', 'data')
tst_labels_glob_str = (tst_labels_dir, "*.txt")

calldata_glob_str = (os.path.join(export_val_dir, 'labels'), "*calldata.tbl")

print("\nCHECK IF THE QUERIES MAKE SENSE\n")
print("Calldata Query:\n", calldata_glob_str)
print("\n")
print("Validation Audios Query:\n", val_audios_glob_str)
print()
print("Validation Labels Query:\n", val_labels_glob_str)
print("\n")
print("Train Audios Query:\n", trn_audios_glob_str)
print()
print("Train Labels Query:\n", trn_labels_glob_str)
print("\n")
print("Test Audios Query:\n", tst_audios_glob_str)
print()
print("Test Labels Query:\n", tst_labels_glob_str)

# You should look at the queries

In [None]:
# find the calldata.tbl file
# NOTE: We only use the calldata.tbl from the val split
# it is the same file for all the splits

calldata_fp = list(recursive_glob(*calldata_glob_str))

if len(calldata_fp) > 1:
    warnings.warn("More than one calldata file found")
elif len(calldata_fp) < 1:
    raise RuntimeError("Calldata file was not found")
else:
    calldata_fp = calldata_fp[0]
    print("CALLDATA filepath:", calldata_fp, sep='\n')
    
calldatas = fe.AllCallData.from_file(calldata_fp)

print("\n\nCALLDATA read for {} callids\n".format(len(calldatas.allcalldata)))
print("\nExample calldata:\n")
print(calldatas['00086'], sep='\n')

# No errors means all okay reading call data

In [None]:
# helpers

AudioLabelPair = namedtuple('AudioLabelPair', 'audio,label')

def fn_for_fp(fp):
    return os.path.basename(fp).split(".")[0]

audio_metas = au.get_audio_metadata
parse_label = lambda fp: fe.ActiveSpeakers.from_file(fp, warn_duplicates=False, allcalldata=calldatas)

def find_validate_pair_split(audios_glob_str, labels_glob_str, split_name, n_expected_pairs=None):
    audio_fps = sorted(recursive_glob(*audios_glob_str))
    label_fps = sorted(recursive_glob(*labels_glob_str))
    
    if n_expected_pairs is not None:
        assert len(audio_fps) == n_expected_pairs,\
            "{} audio files found, expected {}, check your query".format(len(audio_fps), n_expected_pairs)
        assert len(label_fps) == n_expected_pairs,\
            "{} label files found, expected {}, check your query".format(len(label_fps), n_expected_pairs)
    else:
        assert len(audio_fps) > 0, "No audio files found, check your query"
        assert len(label_fps) > 0, "No label files found, check your query"

#     assert len(audio_fps) == len(label_fps),\
#         "\nMISMATCH: audios : {} v/s {} labels".format(len(audio_fps), len(label_fps))

    # make pairs with zip
    pairs = list(starmap(
        AudioLabelPair,
        zip(
            map(audio_metas, audio_fps), 
            map(parse_label, label_fps))))

    # Assert that all pairs have same callid
    # NOTE: here, we check the filename, which has the callid
    assert all(fn_for_fp(a.filepath) == fn_for_fp(l.sourcefile) for a, l in pairs),\
        "\nMISMATCH: callids between audio and label, check filenames, ordering, or missing/substitutions"

    # print some stats for feedback
    print("{}:#############################################################\n\n".format(split_name.upper()),
          "Total Pairs: {}\n\n".format(len(pairs)),
          "For Example:\n\n{}\n\n...".format(
              "\n...\n\n".join("{}\n{}".format(a, l[:3]) 
                        for a, l in pairs[:2])))
    return pairs

In [None]:
# VAL: Find all audio and label files, single list per split
val_pairs = find_validate_pair_split(val_audios_glob_str, val_labels_glob_str, 'val', 99)

# No errors means all okay for VAL

In [None]:
# TRN: Find all audio and label files, single list per split
trn_pairs = find_validate_pair_split(trn_audios_glob_str, trn_labels_glob_str, 'trn', 5200)

# No errors means all okay for TRN

In [None]:
# TST: Find all audio and label files, single list per split
tst_pairs = find_validate_pair_split(tst_audios_glob_str, tst_labels_glob_str, 'tst', 551)

# No errors means all okay for TST

### Limit Training Set to first 1200 calls _from the original training split_

Look for more explanation in the analysis and export notebooks.

In [None]:
trn_pairs = trn_pairs[:1200]

### Sinks

Where the exported features will be saved

In [None]:
pickles_root = os.path.join(splits_root, 'pickles')  # Root where the directory with h5 will go
makedirs_with_existok(pickles_root, exist_ok=True)

# Pattern for naming the dir inside the pickles root
datestamp = time.strftime("%Y%m%d")
pattern = "{}-logmel{}-win{}ms-hop{}ms".format(datestamp, nmels, 
                                                        int(win_sec*1000), 
                                                        int(hop_sec*1000))

pickles_dir = os.path.join(pickles_root, pattern)

print("Export Directory (was created)\n", pickles_dir, sep='\n')
print('\n\n')

fn_h5 = lambda splitname: "{}.h5".format(splitname)
val_h5 = os.path.join(pickles_dir, fn_h5("val"))
trn_h5 = os.path.join(pickles_dir, fn_h5("trn"))
tst_h5 = os.path.join(pickles_dir, fn_h5("tst"))

print("Export Filepaths\n", 
      val_h5, '',
      trn_h5, '',
      tst_h5, '',
      sep='\n')

makedirs_with_existok(pickles_dir, exist_ok=False)

## Main Functions

These are the main functions that will be used in loading audio, extracting features, and inferring labels.

These will be used in pre-flight checks, and later `dask.delayed`.

In [None]:
# main Python / numpy functions

def readaudiodata(pair):
    # NOTE: In the pre-flight checks below, we will be testing the method below in 
    # more detail ... make any changes here if there's something wrong
    
    a, l = pair.audio, pair.label
    
    # We only want to get audio samples for which annotations are available
    with l.samplerate_as(a.samplerate):
        s = int(max(0, l.min_start))  # first start-time
        e = int(min(a.nsamples, l.max_end))  # last end-time
                
    return lr.load(a.filepath, sr=None)[0][s:e]  # choose samples from s to e


def extractfeat(audiodata):
    return au.logmelspectrogram(y=audiodata, sr=sr, n_fft=n_fft, hop_len=hop_len, 
                                window=window, n_mels=nmels,
                                # win_len=win_len,  # not necessary
                               )

def readlabelsdata(pair):
    a, l = pair.audio, pair.label
    
    with l.samplerate_as(a.samplerate):
        nsamples = min(a.nsamples, l.max_end) - l.min_start  # number of samples of this call for feat-ext
        
    endings = fe.samples_for_labelsat(nsamples=nsamples, hop_len=hop_len, win_len=win_len)  # time-stamps for stft based feature vectors
    
    with l.min_start_as(0, samplerate=a.samplerate):  # shift min_start to 0 because the extracted samples start at min_start
        labels = l.labels_at(endings, samplerate=a.samplerate)
        
    return labels

def strided_feat(feat):
    return nu.strided_view(feat, win_shape=chunking, step_shape=chunkstep)  # make chunks, w/o copying

strided_label = strided_feat
    
def expected_featlen(pair):
    a, l = pair.audio, pair.label
    
    with l.samplerate_as(a.samplerate):
        nsamples = min(a.nsamples, l.max_end) - l.min_start
        
    return 1 + (nsamples - win_len) // hop_len

def expected_stridedfeat_shape(pair):
    featlen = expected_featlen(pair)
    nstrides = (featlen - chunkovl) // chunkstep
    return (nstrides, chunking, nmels)

def expected_stridedlabel_shape(pair):
    return expected_stridedfeat_shape(pair)[:-1] + (2, )  # only 2 active-speakers for all labels

In [None]:
# dast.delayed versions of the above ... notice minimum number of args for each

d_readaudiodata = d.delayed(readaudiodata, name='audio')  # args=(pair,)

d_extractfeat = d.delayed(extractfeat, name='feat')  # args=(audiodata,)
# NOTE: Yes, we can split the multiple steps involved in extractfeat
# and delay each of them ... but ... let's not
# Most amount of time is going to be spent in stft, so then
# it will make a lot more sense to do **that** the dasky way ... 
# and that needs another round of making decisions after checks.
# Some other day ...

d_stridefeat = d.delayed(strided_feat, name='stridef')  # args=(feat,)

d_readlabelsdata = d.delayed(readlabelsdata, name='label')  # args=(pair,)

d_stridelabel = d.delayed(strided_label, name='stridel')

In [None]:
def steps(pair):
    # extract and stride features
    audiodata = d_readaudiodata(pair)
    feat = d_extractfeat(audiodata)
    feat = d_stridefeat(feat)  # make into chunks
    
    # extract and stride labels
    label = d_readlabelsdata(pair)
    label = d_stridelabel(label)  # make into chunks
    
    # for feat. This took some debugging, phew!
    xfeatshape = expected_stridedfeat_shape(pair)
    feat = da.from_delayed(feat, xfeatshape, np.float64)
    feat = da.concatenate(feat)
    
    # for labels
    xlabelshape = expected_stridedlabel_shape(pair)
    label = da.from_delayed(label, xlabelshape, np.int)
    label = da.concatenate(label)
    
    # dset paths in hdf5
    callid = pair.label.callid
    groupid = fe.groupid_for_callid(callid)
    topath = "{}/{}".format(groupid, callid)
    
    apath = "audios/{}".format(topath)  # e.g. /audios/000/00001
    lpath = "labels/{}".format(topath)  # e.g. /audios/000/00001
    
    return (apath, feat), (lpath, label)

In [None]:
def do_job_for_split(pairs, tofile):
    dsets = dict()
    
    for pair in pairs:
        (ap, a), (lp, l) = steps(pair)
        
        dsets[ap] = a
        dsets[lp] = l
        
    da.to_hdf5(tofile, dsets, 
               compression='lzf', # compress
               fletcher32=True,  # create checksum
              )

## Pre-Flight Checks

### Audios

In [None]:
# Detailed analysis : Do it for a few files, because it might take a lot of time


# First, the helpers
def validate_audio_detailed(pair):
    assert nchannels <= 2, "nchannels set by you is > 2, and librosa may do something weird"
    
    a, asr = lr.load(pair.audio.filepath, sr=None)
    ashape = a.shape
    amin, amax = a.min(axis=-1), a.max(axis=-1)
    amean = a.mean(axis=-1)
    del a
    
    # check if the samplerates match
    assert asr == samplerate, "samplerate mismatch {} v/s {}".format(asr, samplerate)
    assert asr == pair.audio.samplerate, "samplerate mismatch {} v/s {}".format(asr, pair.audio.samplerate)
    
    # check if nchannels match
    # HACK: don't rely on this, cuz librosa, I think, forces things to be at most stereo
    assert len(ashape) == nchannels, "nchannels mismatch {} v/s {}".format(a.shape, nchannels)
    assert len(ashape) == pair.audio.nchannels, "nchannels mismatch {} v/s {}".format(ashape, pair.audio.nchannels)
    
    # check if the nsamples <= labels.max_end
    with pair.label.samplerate_as(asr):
        me = pair.label.max_end
        
    assert ashape[-1] >= me, "nsamples mismatch {} v/s {}".format(ashape, me)
    
    # assert that the read audio was normalized
    assert np.all(amin >= -1), "amin not >= -1 at {}".format(amin)
    assert np.all(amax <=  1), "amax not <= +1 at {}".format(amax)
    assert np.allclose(amean, 0, atol=1e-3), "amean not close to zero at {}".format(amean)
    
    
def validate_audio_detailed_pairs(pairs):
    for i, pair in enumerate(pairs):
        try:
            validate_audio_detailed(pair)
        except AssertionError as e:
            print('AssertionErrors with pair at {}'.format(i))
            print(fn_for_fp(pair.audio.filepath))
            print(e)
            print()

In [None]:
validate_audio_detailed_pairs(val_pairs)

# No errors means all okay with the audio files ... hopefully

In [None]:
# The quick analysis basically assumes that our impl of the audio_metadata reader is reliable
# Yes, we should validate everything, and ... hopefully ... it will be all quick!

def validate_audio_quick(pair):
    a = pair.audio
    asr = a.samplerate
    ashape = (a.nchannels, a.nsamples) if a.nchannels > 1 else (a.nsamples,)
    
    # check if the samplerates match
    assert asr == samplerate, "samplerate mismatch {} v/s {}".format(asr, samplerate)
    
    # check if nchannels match
    # HACK: don't rely on this, cuz librosa, I think, forces things to be at most stereo
    assert len(ashape) == nchannels, "nchannels mismatch {} v/s {}".format(ashape, nchannels)
    
    # check if the nsamples <= labels.max_end
    with pair.label.samplerate_as(asr):
        me = pair.label.max_end
        
    assert ashape[-1] >= me, "nsamples mismatch {} v/s {}".format(ashape, me)
    
    
def validate_audio_quick_pairs(pairs):
    with_err = []
    for i, pair in enumerate(pairs):
        try:
            validate_audio_quick(pair)
        except AssertionError as e:
            print('AssertionErrors with pair at {}'.format(i))
            print(fn_for_fp(pair.audio.filepath))
            print(e)
            print()
            with_err.append(i)
            
    return with_err
    

In [None]:
validate_audio_quick_pairs(val_pairs)

# No errors means all okay with the audio files ... hopefully

In [None]:
trn_with_err = validate_audio_quick_pairs(trn_pairs)
print(trn_with_err)
# No errors means all okay with the audio files ... hopefully

In [None]:
validate_audio_quick_pairs(tst_pairs)

# No errors means all okay with the audio files ... hopefully

#### Some errors?

In [None]:
print('Files with errors:', 
      *[(i, fn_for_fp(trn_pairs[p].audio.filepath)) 
        for i, p in enumerate(trn_with_err)], 
      sep='\n')
print()
validate_audio_detailed_pairs((trn_pairs[i] for i in trn_with_err))

In [None]:
i = 0
el = trn_pairs[trn_with_err[i]].label
am = trn_pairs[trn_with_err[i]].audio
a, asr = lr.load(am.filepath, sr=None)

In [None]:
with el.min_start_as(0, samplerate=asr): print(el[-10:])

In [None]:
a.shape

In [None]:
am

In [None]:
with el.samplerate_as(1): print(el[-10:])

In [None]:
am.seconds

##### Report

The audio is smaller than labels!!

It is possible that the labels are wrong and the final channels were chopped early.

It is also possible that our exporting procedure did this when merging the two channels from 
the original sph files. But there is no time to rectify that, even if there is a way.

Actually, our exporting procedure would have raised an error if the `sph2pipe` passed it along
with the missing samples ... but ... oh well ...

We will have to keep it in mind when slicing the audio data before feature extraction,
and hence, can only rely on the `nsamples` from the metadata (they are correct!), and not
from the `max_end` of the labels.

Fixes have been added to the main-functions above.

Files with errors:
0. 'fe_03_00396.wav'
1. 'fe_03_00401.wav'
2. 'fe_03_00557.wav'

### Features

We want to make sure that the features we will be calculating have the right shape.

The values ... well ... I hope you are using reliable ones, because we can't do that deterministically.

Finally, the validations will have to extract the features, and hence, will be slow.
You will have to settle for smaller set of pairs.

Do check for the pairs that caused issues earlier though!

In [None]:
def validate_featshape_detailed(pair):
    am, lb = pair.audio, pair.label
    
    ad = readaudiodata(pair)
    feat = extractfeat(ad)
    featshape = feat.shape
    
    strfeat = strided_feat(feat)
    strfeatshape = strfeat.shape
    
    del feat
    del strfeat
    
    # expectations
    xfeatshape = (expected_featlen(pair), nmels)
    xstrfeatshape = expected_stridedfeat_shape(pair)
    
    # assert that they are the expected shapes
    assert featshape == xfeatshape, "Mismatch in featshape: {} v/s {}".format(featshape, xfeatshape)
    assert strfeatshape == xstrfeatshape, "Mismatch in strfeatshape: {} v/s {}".format(strfeatshape, xstrfeatshape)
    
    
def validate_featshape_detailed_pairs(pairs):
    for i, pair in enumerate(pairs):
        try:
            validate_featshape_detailed(pair)
        except AssertionError as e:
            print('AssertionErrors with pair at {}'.format(i))
            print(fn_for_fp(pair.audio.filepath))
            print(e)
            print()

In [None]:
# This might be a long running one ... choose few

validate_featshape_detailed_pairs(val_pairs[:5])

# No errors or printouts means all okay with the shape of the features ... hopefully

In [None]:
# Do check for the earlier problem ones!

validate_featshape_detailed_pairs([trn_pairs[i] for i in trn_with_err])

# No errors or printouts means all okay with the shape of the features ... hopefully

### Labels

Validate the label shapes.

In [None]:
def validate_labelshape(pair):
    labels = readlabelsdata(pair)
    labelshape = labels.shape
    
    strlabels = strided_label(labels)
    strlabelshape = strlabels.shape
    
    xlabelshape = (expected_featlen(pair), 2)
    xstrlabelshape = expected_stridedlabel_shape(pair)
    
    assert labelshape == xlabelshape, "Mismatch in labelshape: {} v/s {}".format(labelshape, xlabelshape)
    assert strlabelshape == xstrlabelshape, "Mismatch in strlabelshape: {} v/s {}".format(strlabelshape, xstrlabelshape)
    
def validate_labelshape_pairs(pairs):
    for i, pair in enumerate(pairs):
        try:
            validate_labelshape(pair)
        except AssertionError as e:
            print('AssertionErrors with pair at {}'.format(i))
            print(fn_for_fp(pair.audio.filepath))
            print(e)
            print()

In [None]:
# This should be quick
validate_labelshape_pairs(val_pairs)

# No errors or printouts means all okay with the shape of the features ... hopefully

In [None]:
# This should be quick
validate_labelshape_pairs(trn_pairs)

# No errors or printouts means all okay with the shape of the features ... hopefully

In [None]:
# This should be quick
validate_labelshape_pairs(tst_pairs)

# No errors or printouts means all okay with the shape of the features ... hopefully

### Dask Steps

In [None]:
def validate_dasksteps_detailed(pair):
    xsfs = expected_stridedfeat_shape(pair)
    xsls = expected_stridedlabel_shape(pair)
    
    xdatachunks = ((xsfs[1], ) * xsfs[0], xsfs[2:])
    xlabelchunks = ((xsls[1], ) * xsls[0], xsls[2:])
    
    xdatashape = (xsfs[0] * xsfs[1], ) + xsfs[2:]
    xlabelshape = (xsls[0] * xsls[1], ) + xsls[2:]
    
    callid = pair.label.callid
    groupid = fe.groupid_for_callid(callid)
    xap = "{}/{}/{}".format('audios', groupid, callid)
    xlp = "{}/{}/{}".format('labels', groupid, callid)
    
    (ap, a), (lp, l) = steps(pair)
    
    # assert paths in h5
    assert ap == xap, "Mismatch in audios path: {} v/s {}".format(ap, xap)
    assert lp == xlp, "Mismatch in labels path: {} v/s {}".format(lp, xlp)
    
    # assert chunking
    assert a.chunks == xdatachunks, "Mismatch in audio chunks:\n{} v/s\n{}".format(a.chunks, xdatachunks)
    assert l.chunks == xlabelchunks, "Mismatch in label chunks:\n{} v/s\n{}".format(l.chunks, xlabelchunks)
    
    # assert shape
    assert a.shape == xdatashape, "Mismatch in audio shape: {} v/s {}".format(a.shape, xdatashape)
    assert l.shape == xlabelshape, "Mismatch in label shape: {} v/s {}".format(l.shape, xlabelshape)
    
    # assert label values
    xlabels = np.concatenate(strided_label(readlabelsdata(pair)))
    labels = l.compute()
    assert np.all(labels == xlabels), "Mismatch in labels data"
    del xlabels
    del labels
    
    # assert audio values
    xdata = np.concatenate(strided_feat(extractfeat(readaudiodata(pair))))
    data = a.compute()
    assert np.allclose(data, xdata), "Mismatch in audio data"
    del xdata
    del data
    
def validate_dasksteps_detailed_pairs(pairs):
    for i, pair in enumerate(pairs):
        try:
            validate_dasksteps_detailed(pair)
        except AssertionError as e:
            print('AssertionErrors with pair at {}'.format(i))
            print(fn_for_fp(pair.audio.filepath))
            print(e)
            print()

In [None]:
# This might be a long running one ... choose few
validate_dasksteps_detailed_pairs(val_pairs[:5])

# No errors means all okay with the dask steps results

In [None]:
# This might be a long running one ... choose few
validate_dasksteps_detailed_pairs([trn_pairs[i] for i in trn_with_err])

# No errors means all okay with the dask steps results

## HDF5 Export

If all the pre-flight checks passed, it's time we finally export the HDF5 files.

We may later check on the results ... however ... taking too long may be a bad sign already.

In [None]:
pr.clear()
rpr.clear()

In [None]:
%%time
print(val_h5)
do_job_for_split(val_pairs, tofile=val_h5)
dg.visualize([pr, rpr])
pr.clear()
rpr.clear()

In [None]:
%%time
print(trn_h5)
do_job_for_split(trn_pairs, tofile=trn_h5)
dg.visualize([pr, rpr])
pr.clear()
rpr.clear()

In [None]:
%%time
print(tst_h5)
do_job_for_split(tst_pairs, tofile=tst_h5)
dg.visualize([pr, rpr])
pr.clear()
rpr.clear()

## Check HDF5, Add Label Infos and Viterbi priors

In the `rennet.datasets.fisher` module, `H5ChunkingsReader` was implemented to be able to work with the hdf5 structure that has been exported. 
The class reads the chunks of each call as per the requirements imposed by `rennet.utils.h5_utils.BaseH5ChunkingsReader`.
It also has some convenient `classmethods` to be able to choose which `callids` or `groupids` to work with. 
Other datasets with different structures may require considerably different such reader.

Then, `UnnormedFramewiseInputsProvider` was implemented as well, which allows us to `flow` (read) the data throw a generator without adding contextual frames to the acoustic features, and without applying any normalization.
Other, rather esoteric yet very useful `InputsProvider` have also been implemented that do those things.

The goal of these classes is to be able to `flow` this data into `keras.model.fit_generator(...)` method later during training, without exploding the memory or being mind-numbingly slow (upto 25x compared to a naive solution).
They are quite complicated, especially looking at the class heirarchies, and there have definitely been some compromises made.
But ... this is what we will be using.

Nevertheless, we will now add some more meta-information about the speakers, etc. to these created hdf5 files so that these informations will be available, without the need for another feature extraction step (if the same acoustic features are to be used). e.g. label based on which gender is active, etc.


```
fisher.CallData(callid='00396', topicid='ENG28', signalgrade=4.0, convgrade=3.0, channelspeakers=
		[fisher.Speaker(pin='2215', gender='f', dialect='o'), 
		 fisher.Speaker(pin='4722', gender='f', dialect='a')])
         
```

Lastly, for Viterbi Smoothing of the (expectedly) noisy soft-max posteriors from, the code below has been implemented to work from `InputsProviders`, even though we could have also extracted them from the label files (using the available `ActiveSpeakers.calc_raw_viterbi_priors(...)` method.
This is more closer to the real data.

The initial, transition and prior occurrences _(not normalized probabilities)_ will be calculated and added to the root level group `viterbi` in the respective hdf5 files of each split. 
Later, I believe, we will only ever be using the priors from the validation and training split, but it doesn't hurt to calculate from the testing set, does it?

In [None]:
def raw_viterbi_priors_from_ip(inputs_provider):
    currn = None

    init = None
    tran = None
    prior = None
    for xy, (_, chunking) in inputs_provider.flow(
            indefinitely=False,
            only_labels=True,
            with_chunking=True, ):

        true = xy[1].astype(int)
        if currn is None:  # first callid
            currn = chunking.labelpath
            init = true[0, ...]
            prior = true.sum(axis=0)
            tran = nu.confusion_matrix_forcategorical(true[:-1], true[1:])
            continue
        elif chunking.labelpath != currn:  # next callid
            init += true[0, ...]
            currn = chunking.labelpath
            
        
        prior += true.sum(axis=0)
        tran += nu.confusion_matrix_forcategorical(true[:-1], true[1:])

    return init, tran, prior


In [None]:
def checkh5_addinfo(pair, h5):
    xsfs = expected_stridedfeat_shape(pair)
    xsls = expected_stridedlabel_shape(pair)
    
    xdatachunks = ((xsfs[1], ) + xsfs[2:])
    xlabelchunks = ((xsls[1], ) + xsls[2:])
    
    xdatashape = (xsfs[0] * xsfs[1], ) + xsfs[2:]
    xlabelshape = (xsls[0] * xsls[1], ) + xsls[2:]
    
    l = pair.label
    dp = fe.UnnormedFramewiseInputsProvider.for_callids(
        h5,
        l.callid,
    )
    
    assert dp.totlen == xdatashape[0], "Mismatch in audio len: {} v/s {}".format(dp.totlen, xdatashape)
    assert dp.totlen == xlabelshape[0], "Mismatch in label len: {} v/s {}".format(dp.totlen, xlabelshape)
    
    for c in dp.chunkings:
        lend = c.dataslice[0].stop - c.dataslice[0].start
        assert lend == xdatachunks[0], "Mismatch in audio chunking: {} v/s {}".format(lend, xdatachunks)
        
        lenl = c.labelslice[0].stop - c.labelslice[0].start
        assert lenl == xlabelchunks[0], "Mismatch in label chunking: {} v/s {}".format(lenl, xlabelchunks)
    
    # Add infos
    lp = dp.chunkings[0].labelpath
    ap = dp.chunkings[0].datapath
    calldata = l.calldata
    
    with h.File(h5, 'a') as f:
        f[lp].attrs['speaker_pins'] = np.array([np.string_(s.pin) for s in calldata.channelspeakers])
        f[lp].attrs['speaker_genders'] = np.array([np.string_(s.gender) for s in calldata.channelspeakers])
        f[lp].attrs['speaker_dialects'] = np.array([np.string_(s.dialect) for s in calldata.channelspeakers])
        
        f[lp].attrs['topicid'] = calldata.topicid
        f[lp].attrs['signalgrade'] = calldata.signalgrade
        f[lp].attrs['convgrade'] = calldata.convgrade
        
        # FIXME: FUCK THIS SHIT
#         f[lp].dims[1].label = np.string_('active_speaker_channel')
#         f.create_dataset("labels/active_speaker_channels", data=[0, 1])
#         f[lp].dims[1].attach_scale(f["labels/active_speaker_channels"])
        
#         f[ap].dims[1].label = np.string_('mel_frequencies')
#         f.create_dataset("audios/mel_frequencies", data=melfreq)
#         f[lp].dims[1].attach_scale(f["audios/mel_frequencies"])
        
        f.flush()
            
        return raw_viterbi_priors_from_ip(dp)
        
    
def checkh5_addinfo_pairs(pairs, h5):
    init = None
    tran = None
    prior = None
    for i, pair in enumerate(pairs):
        try:
            _init, _tran, _prior = checkh5_addinfo(pair, h5)
            if init is None:
                init = _init.copy()
                tran = _tran.copy()
                prior = _prior.copy()
            else:
                init += _init
                tran += _tran
                prior += _prior
            
        except AssertionError as e:
            print('AssertionErrors with pair at {}'.format(i))
            print(fn_for_fp(pair.audio.filepath))
            raise
#             print(e)
#             print()

    # Add Viterbi Priors
    with h.File(h5, 'a') as f:
        g = f.create_group("viterbi")
        for d, p in zip((init, tran, prior), ('init', 'tran', 'priors')):
            g.create_dataset(p, data=d)  # e.g. /viterbi/init
        f.flush()
    

In [None]:
checkh5_addinfo_pairs(val_pairs, val_h5)

In [None]:
checkh5_addinfo_pairs(trn_pairs, trn_h5)

In [None]:
checkh5_addinfo_pairs(tst_pairs, tst_h5)