# ETL HDF5

## Motivation

Reading .parquet is slow, and it's painful to join the metadata to the measurement data.

I wanted a way to save the joined data so it's easy to read back.

Hopefully this will help some of you having similar problems - also, welcome to HDF5!

## Description

Below are some utility classes to extract, transform, and load the data into HDF5 format.

I've also provided some generators to load it back out in chunks.

## Design decision

Because faults are highly correlated between phases, I've decided to save each row as a trio of signal measurements, with a trio of targets. eg.

X.shape = [N, 3, 8e5]
y.shape = [N, 3]

The plan is to perform classification together, predicting one of 2^3=8 classes.

I've included  a utility class BinaryEncoder to translate from targets in 3-tuple form <-> decimal.

### Notes

I've set the compression level in HdfEtl to 1 to not exceed the kernel disk space, but you should set it to 0 to get the fastest read times. Also uncomment the line loading the test data in start().

In [None]:
! ls ../input

In [None]:
import os, gc
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import matplotlib.pyplot as plt
from sklearn.metrics import matthews_corrcoef
from scipy import signal
import seaborn as sns
from tqdm import tqdm_notebook as tqdm
from numba import jit, int32
import tables
from scipy import signal
from scipy.signal import butter
import pywt
from statsmodels.robust import mad

In [None]:
class MeasurementDataType(tables.IsDescription):
    msr = tables.Int8Col(shape=(3,800000), pos=1)
    target = tables.BoolCol(shape=(3,), pos=2)

class HdfEtl():
    """Extracts data from .parquet, performs transformation into tidy(ish) 
    format, and loads into HDF5 file.
    """
    
    N_PHASE = 3
    PQ_CHUNK = 3*300
    COMPRESSION = 1 # slower read with higher compression

    trn_h5_key = 'train'
    val_h5_key = 'validation'
    tst_h5_key = 'test'
    
    def __init__(self, path):
        self.path = path
        self.meta_trn_f = f'{path}/metadata_train.csv'
        self.msr_trn_f  = f'{path}/train.parquet'
        self.meta_tst_f = f'{path}/metadata_test.csv'
        self.msr_tst_f  = f'{path}/test.parquet'
        
    def start(self, out_f='data.h5', filt=None, val_size = 0.2):
        """Load in the .parquet files in chunks, transform, and save as HDF5"""
        self.h5_f = out_f
        self.init_h5()
        self.etl(self.meta_trn_f, self.msr_trn_f, self.trn_h5_key, 
                 self.PQ_CHUNK, filt, val_size)
        #self.etl(self.meta_tst_f, self.msr_tst_f, self.tst_h5_key, 
        #         self.PQ_CHUNK, filt)

    def etl(self, meta_f, pq_f, h5_key, chunk_size, filt=None, val_size=0.0):
        """"""
        meta_df = pd.read_csv(meta_f) 
        sig_start = meta_df['signal_id'].min()
        sig_end   = meta_df['signal_id'].max()

        # load chunk of parquet file
        for msr_df in self.read_parquet(pq_f, sig_start, sig_end, chunk_size):
            #print(f'Adding columns: {msr_df.shape[1]}')
            n_msr = msr_df.shape[0]                   # number of measurements ~8e5
            start_col = int(msr_df.columns[0])        # first signal_id
            n_obs = int(msr_df.shape[1]/self.N_PHASE) # number of observations in this chunk (tuple of 3 signals)
                
            msr = np.zeros(shape=(n_obs,self.N_PHASE,n_msr), dtype='int8')
            for i in range(n_obs):
                msr[i, :, :] = np.array(msr_df.iloc[:,i*self.N_PHASE:(i+1)*self.N_PHASE].T)

            # filter measurements if provided
            if filt is None:
                fmsr = msr
            else:
                fmsr = filt(msr)
                
            targets = np.zeros(shape=(n_obs, self.N_PHASE))
            if 'target' in meta_df.columns:
                for i in range(n_obs):
                    targets[i,:] = meta_df.loc[start_col+i:start_col+i+self.N_PHASE-1, 'target'].T

            if val_size > 0:
                val_idxs = np.random.random(targets.shape[0]) < val_size
                self.write_h5(h5_key, fmsr[~val_idxs,:,:], targets[~val_idxs,:])
                self.write_h5(self.val_h5_key, fmsr[val_idxs,:,:], targets[val_idxs,:])
            else:    
                self.write_h5(h5_key, fmsr, targets)
            gc.collect()
            
    def init_h5(self):
        if os.path.exists(self.h5_f):
            os.remove(self.h5_f)
            
        filt = tables.Filters(complevel=self.COMPRESSION)
        
        h5f = tables.open_file(self.h5_f, 'w', title='vsb-power-line-fault-detection')
        h5f.create_table('/', self.trn_h5_key, MeasurementDataType, 'Train Data',
                        expectedrows=int(8712*0.8/3), filters=filt)
        h5f.create_table('/', self.val_h5_key, MeasurementDataType, 'Validation Data',
                        expectedrows=int(8712*0.2/3), filters=filt)
        h5f.create_table('/', self.tst_h5_key, MeasurementDataType, 'Test Data',
                        expectedrows=int(20337/3), filters=filt)
        print(f'Initialised: {h5f}')
        h5f.close()
            
    def write_h5(self, key, msr, targets):
        print(f'\nWriting {targets.shape[0]} rows to {self.h5_f} {key}')
        h5f = tables.open_file(self.h5_f, 'r+')
        table = h5f._get_node(key)
        measurement_data = table.row
        for i in range(msr.shape[0]):
            measurement_data['msr'] = msr[i,:,:]
            measurement_data['target'] = targets[i,:]
            measurement_data.append()
        table.flush()
        h5f.close()
    
    def read_parquet(self, fn, from_col, to_col, chunk_size):
        print(f'Reading: "{fn}"...')
        for i in tqdm(range(from_col, to_col+1, chunk_size)):
            if i+chunk_size <= to_col:
                cols = [str(j) for j in range(i,i+chunk_size)]
            else:
                cols = [str(j) for j in range(i,to_col+1)]
            yield pq.read_pandas(fn, columns=cols).to_pandas()

Thanks to [Jack](https://www.kaggle.com/jackvial/dwt-signal-denoising) for this HPF/DN code.

In [None]:
# from: https://www.kaggle.com/jackvial/dwt-signal-denoising
class Filterer:
    
    def __init__(self):
        self.n_samples = 800000
        self.sample_duration = 0.02
        self.SAMPLE_RATE = self.n_samples * (1 / self.sample_duration)
        
    def filter_xs(self, xs):
        result = np.zeros(xs.shape)
        for i in tqdm(range(xs.shape[0])):
            result[i,:,:] = self.filter_x(xs[i,:,:])
        return result

    def filter_x(self, x):
        result = np.zeros(x.shape)
        for i in range(x.shape[0]):
            result[i,:] = self.filter_sig(x[i,:])
        return result

    def filter_sig(self, sig):
        return self.denoise_signal(
            self.high_pass_filter(sig, self.SAMPLE_RATE, 10000), 
            'db4', 1)
    
    def maddest(self, d, axis=None):
        """Mean Absolute Deviation"""
        return np.mean(np.absolute(d - np.mean(d, axis)), axis)

    def high_pass_filter(self, x, sample_rate, low_cutoff):
        nyquist = 0.5 * sample_rate
        norm_low_cutoff = low_cutoff / nyquist
        sos = butter(10, Wn=[norm_low_cutoff], btype='highpass', output='sos')
        filtered_sig = signal.sosfilt(sos, x)
        return filtered_sig

    def denoise_signal(self, x, wavelet, level):
        coeff = pywt.wavedec(x, wavelet, mode="per")
        sigma = (1/0.6745) * self.maddest(coeff[-level])
        uthresh = sigma * np.sqrt(2*np.log(len(x)))
        coeff[1:] = (pywt.threshold( i, value=uthresh, mode='hard' ) for i in coeff[1:])
        return pywt.waverec(coeff, wavelet, mode='per')


In [None]:
# ETL data to HDF5
filt = Filterer()
etl = HdfEtl('../input')
etl.start('filt.h5', filt.filter_xs, 0.2)

## Generators

In [None]:
class SignalDataset:
    
    def __init__(self, h5f):
        self.h5f = h5f
        self.trn_node  = h5f.get_node('/train')
        self.val_node  = h5f.get_node('/validation')
        self.test_node = h5f.get_node('/test')

    @property
    def trn_x(self): return self.trn_node.cols.msr
    
    @property
    def trn_y(self): return self.trn_node.cols.target
    
    @property
    def val_x(self): return self.val_node.cols.msr
    
    @property
    def val_y(self): return self.val_node.cols.target
    
    @property
    def test_x(self): return self.test_node.cols.msr
    
    @property
    def test_y(self): return self.test_node.cols.target
    
    def gen_trn_x(self, chunk_size = 100):
        return self.generator(self.trn_x, chunk_size)
    
    def gen_trn_y(self, chunk_size = 100):
        return self.generator(self.trn_y, chunk_size)
    
    def gen_val_x(self, chunk_size = 100):
        return self.generator(self.val_x, chunk_size)
    
    def gen_val_y(self, chunk_size = 100):
        return self.generator(self.val_y, chunk_size)
    
    def gen_test_x(self, chunk_size = 100):
        return self.generator(self.test_x, chunk_size)
    
    def gen_test_y(self, chunk_size = 100):
        return self.generator(self.test_y, chunk_size)
    
    def generator(self, data, chunk_size):
        n = data.shape[0]
        for i in range(0, n, chunk_size):
            start_idx = i
            end_idx = i+chunk_size if i+chunk_size < n else n
            yield data[start_idx:end_idx]

In [None]:
h5f = tables.open_file('filt.h5')
sds = SignalDataset(h5f)
gen = sds.gen_trn_x()

In [None]:
trn_xc = gen.__next__()
print(trn_xc.shape)
trn_xc.head()

## Feature Extraction and 3-tuple Target encoding

In [None]:
class FeatureExtractor:
   
    def to_df(self, gen):
        df = pd.DataFrame(columns=[
            'min0', 'min1', 'min2', 
            'max0', 'max1', 'max2',
            'mean0', 'mean1', 'mean2'])
        
        for xs in gen:
            ds = [self.extract_features(xs[i,:,:]) for i in range(xs.shape[0])]
            df = pd.concat([df, pd.DataFrame(ds)])
        return df
    
    def extract_features(self, x):
        min0, min1, min2 = x[0,:].min(), x[1,:].min(), x[2,:].min()
        max0, max1, max2 = x[0,:].max(), x[1,:].max(), x[2,:].max()
        mean0, mean1, mean2 = x[0,:].mean(), x[1,:].mean(), x[2,:].mean()
        return {'min0':min0, 'min1':min1, 'min2':min2, 
                'max0':max0, 'max1':max1, 'max2':max2, 
                'mean0':mean0, 'mean1':mean1, 'mean2':mean2}

In [None]:
class BinaryEncoder:
    
    cost_matrix = np.array([
        [0, 1, 1, 2, 1, 2, 2, 3],
        [1, 0, 2, 1, 2, 1, 2, 2],
        [1, 2, 0, 1, 2, 3, 1, 2],
        [2, 1, 1, 0, 3, 2, 2, 1],
        [1, 2, 2, 3, 0, 1, 1, 2],
        [2, 1, 3, 2, 1, 0, 2, 1],
        [2, 2, 1, 2, 1, 2, 0, 1],
        [3, 2, 2, 1, 2, 1, 1, 0]])
    
    def cost_decimal(self, d1, d2):
        return self.cost_matrix[d1, d2]
    
    def cost_binary(self, b1, b2):
        return sum(np.logical_xor(b1, b2))
    
    def v_encode(self, vb):
        return [self.encode(x) for x in vb]
    
    def encode(self, b):
        return 4*b[0] + 2*b[1] + b[2]
    
    def v_decode(self, vd):
        return [self.decode(x) for x in vd]
    
    def decode(self, d):
        if d == 0:
            return np.array([0,0,0])
        if d == 1:
            return np.array([0,0,1])
        if d == 2:
            return np.array([0,1,0])
        if d == 3:
            return np.array([0,1,1])
        if d == 4:
            return np.array([1,0,0])
        if d == 5:
            return np.array([1,0,1])
        if d == 6:
            return np.array([1,1,0])
        if d == 7:
            return np.array([1,1,1])

In [None]:
print(sds.trn_x.shape)
print(sds.val_x.shape)
#print(sds.test_x.shape)

In [None]:
fe = FeatureExtractor()
trn_x  = fe.to_df(sds.gen_trn_x())
val_x  = fe.to_df(sds.gen_val_x())
#test_x = fe.to_df(sds.gen_test_x())
print(trn_x.shape)
print(val_x.shape)
#print(test_x.shape)
trn_x.head()

In [None]:
be = BinaryEncoder()
trn_y  = be.v_encode(sds.trn_y[:])
val_y  = be.v_encode(sds.val_y[:])
#test_y = be.v_encode(sds.test_y[:])
print(len(trn_y))
print(len(val_y))
#print(len(test_y))
print(sds.trn_y[:10])
print(trn_y[:10])

In [None]:
h5f.close()

To be continued...