<h1>What I have</h1>

**From signal**
(everywhere can be used std_normalization):

- Original signal
- Denoised signal
- Noise (difference between signals)
- Mean of noiseless signal
- Mean of original signal

**From other data**:

- Metrics of signal
- Frequency domain of original signal
- Spectrogram of signal, where channels are (denoised, original)

<h1>What I should use</h1>

- Denoised signal, using wavelet transform (800_000)
    - **\[Res(ODE)-Conv\]+Pool x 2-4** -> **B-LSTM x 2-4** -> **Dense**<br>Put dropout and batch normalization in between.
- Mean of noiseless signal (800_000)
    - **\[Res(ODE)-Conv\]+Pool x 2-4** -> **B-LSTM x 2-4** -> **Dense**<br>Put dropout and batch normalization in between.
- Metrics of signal (19)
    - **Dense x 2-4**
    - **Gradient boosting**
- Spectrogram (129 x 3_571 x 2)
    - **\[Res(ODE)-Conv2D\]+Pool x 4-8** -> **Dense**<br>Put dropout and batch normalization in between.
- Frequency domain of a signal (1000)
    - **\[Res(ODE)-Conv\]+Pool x 2-4** -> **B-LSTM x 2-4** -> **Dense**<br>Put dropout and batch normalization in between.
    
<h1>Final Model</h1>

Input: stacked into vector (predicted probabilities of all models, \[output of the last layer\] x 5)
- **Dense x 2-4**
- **Gradient boosting**
Output: final prediction

Optimize threshhold for matthews correlation.

In [1]:
import pandas as pd
import numpy as np
import pywt as pw
from scipy import fftpack, signal, stats
import patsy
from statsmodels.robust import mad

import pyarrow.parquet as pq
from plotting import plot_phases, plot_single_func, plot_phases_func, plot_values, plot_values_loglog
import matplotlib.pyplot as plt
%matplotlib inline

from tqdm import tnrange, tqdm_notebook, tqdm
import multiprocessing as mp
import gc
import h5py

import warnings
warnings.filterwarnings('ignore')

In [2]:
NUM_OF_MEASURES = 340
COLS_STR = [str(i) for i in range(3 * NUM_OF_MEASURES)]
COLS_INT = [i for i in range(3 * NUM_OF_MEASURES)]

TR_PQ = '../data/parquet/train.parquet'
TR_META = '../data/meta/metadata_train.csv'

TS_PQ = '../data/parquet/test.parquet'
TS_META = '../data/meta/metadata_test.csv'

TR_H5 = '../data/hdf5/train.hdf5'

TR_PQ = '/home/vrudenko/Drive_data/PowerLine_Fault_Detection/data/parquet/train.parquet'
TR_META = '/home/vrudenko/Drive_data/PowerLine_Fault_Detection/data/meta/metadata_train.csv'

TS_PQ = '/home/vrudenko/Drive_data/PowerLine_Fault_Detection/data/parquet/test.parquet'
TS_META = '/home/vrudenko/Drive_data/PowerLine_Fault_Detection/data/meta/metadata_test.csv'

train = pq.read_pandas(TR_PQ).to_pandas().values.T

In [3]:
train_meta = pd.read_csv(TR_META)
test_meta = pd.read_csv(TS_META)

# Functions

In [4]:
WAVELET_TYPE = 'db6'
WAVELET_LEVEL = 4

In [5]:
def denoise_phase(phase):
    wavelet = pw.Wavelet(WAVELET_TYPE)
    wc = pw.wavedec(phase, wavelet, level=WAVELET_LEVEL)
    sigma = mad(wc[-1])
    threshold = sigma * np.sqrt(2 * np.log(len(phase)))

    wc_r = wc[:]
    wc_r[1:] = (pw.threshold(x, threshold) for x in wc[1:])
    return pw.waverec(wc_r, wavelet)

def std_normalize_phase(phase):
    return (phase - np.mean(phase)) / np.std(phase)

def denoise_normalize_phase(phase):
    return std_normalize_phase(denoise_phase(phase))

def minmax_normalize(phase):
    return (phase - np.min(phase)) / (np.max(phase) - np.min(phase))

def mean(phases):
    return np.mean(phases, axis=0)

In [6]:
def get_phases(df, measurement_id):
    p1 = df[str(measurement_id * 3)].values
    p2 = df[str(measurement_id * 3 + 1)].values
    p3 = df[str(measurement_id * 3 + 2)].values

    return p1, p2, p3

def triple_denoise(df, measurement_id):
    return np.asarray(
        [denoise_phase(phase) for phase in get_phases(df, measurement_id)])

def original_mean(df, measurement_id):
    return mean(get_phases(df, measurement_id))

def denoised_mean(df, measurement_id):
    return mean(triple_denoise(df, measurement_id))

def denoise_normalize(df, measurement_id):
    return np.asarray([
        std_normalize_phase(phase)
        for phase in triple_denoise(df, measurement_id)
    ])

def original_normalize(df, measurement_id):
    return np.asarray([
        std_normalize_phase(phase) for phase in get_phases(df, measurement_id)
    ])

In [7]:
def entropy(phase):
    _, count = np.unique(phase, return_counts=True)
    count = count / count.sum()
    
    return stats.entropy(count)

def decomposition_energy(phase):
    wavelet = pw.Wavelet(WAVELET_TYPE)
    wc = pw.wavedec(phase, wavelet)
    
    return np.log10(np.sqrt(np.sum(np.array(wc[-WAVELET_LEVEL]) ** 2)) / len(wc[-WAVELET_LEVEL]))

In [8]:
def metrics(phase, asdict=False):
    f, Pxx = signal.welch(phase)
    ix_mx = np.argmax(Pxx)
    ix_mn = np.argmin(Pxx)

    mean = np.mean(phase)
    std = np.std(phase)
    per_le = np.percentile(phase, [0, 1, 25, 50, 75, 99, 100])
    
    
    d = {
        'mean_signal': mean,
        'std_signal': std,
        'std_top_signal': mean + std,
        'std_bot_signal': mean - std,
        'kurtosis_signal': stats.kurtosis(phase),
        'skewness_signal': stats.skew(phase),
        'percentile_signal': per_le.tolist(),
        'rel_percentile_signal': (per_le - mean).tolist(),
        'range_signal': per_le[-1] - per_le[0],
        
        'entropy_signal': entropy(phase),
        'dec_ener_signal': decomposition_energy(phase),
        
        'mean_amp': np.mean(Pxx),
        'std_amp': np.std(Pxx),
        'median_amp': np.median(Pxx),
        'kurtosis_amp': stats.kurtosis(Pxx),
        'skewness_amp': stats.skew(Pxx),
        

        'max_signal': np.max(phase),
        'min_signal': np.min(phase),
        
        'max_amp': Pxx[ix_mx],
        'min_amp': Pxx[ix_mn],
        
        'max_freq': f[ix_mx],
        'min_freq': f[ix_mn],
        
        'strong_amp': np.sum(Pxx > 2.5),
        'weak_amp': np.sum(Pxx < 0.4),
    }

    if asdict:
        return d
    else:
        flat_list = []
        for sublist in list(d.values()):
            if isinstance(sublist, list):
                for item in sublist:
                    flat_list.append(item)
            else:
                flat_list.append(sublist)
        return np.asarray(flat_list)

In [9]:
def onehot_phase(phase):
    return [0 if phase == 0 else 1, 0 if phase == 1 else 1]

In [10]:
def feature_matrix(phase, func, n_dims=128):
    data = []
    chunk = int(800000 / n_dims)
    for part in range(n_dims):
        data.append(func(phase[part * chunk:(part + 1) * chunk]))
    return np.asarray(data)

# Frequency domain

In [11]:
def get_freq(val, n=2002, d=(0.02 / 800000.)):
    sig_fft = fftpack.fft(val, n=n)
    sample_freq = fftpack.fftfreq(n=n, d=d)
    pos_mask = np.where(sample_freq >= 0)

    freqs = sample_freq[pos_mask][1:]
    power = np.abs(sig_fft)[pos_mask][1:]

    return freqs, power

def get_freq_dom(values, denoised=None, n=1000, d=(0.02 / 800000.)):
    size = n * 2 + 2
    
#     if denoised is None:
#         denoised = denoise_phase(values)
    
    _, p1 = get_freq(values, size, d)
#     _, p2 = get_freq(denoised, size, d)
    
#     return np.reshape(np.asarray([p1, p2]).T, (n, 2))
    return p1

# Spectrogram

In [12]:
def get_spectrogram(values, denoised=None, fs=1 / (2e-2 / 800000), uselog=True):
#     if denoised is None:
#         denoised = denoise_phase(values)

#     _, _, Sx1 = signal.spectrogram(denoised, fs)
    _, _, ret = signal.spectrogram(values, fs)

#     ret = np.concatenate((
#         np.reshape(Sx1, (Sx1.shape[0], Sx1.shape[1], -1)),
#         np.reshape(Sx2, (Sx2.shape[0], Sx2.shape[1], -1)),
#     ),
#                          axis=-1)

    if uselog:
        return np.log10(ret)
    else:
        return ret

# Paperspace parallel

In [13]:
def parallel_apply_along_axis(func1d, axis, arr, *args, **kwargs):
    """
    Like numpy.apply_along_axis(), but takes advantage of multiple
    cores.
    """        
    # Effective axis where apply_along_axis() will be applied by each
    # worker (any non-zero axis number would work, so as to allow the use
    # of `np.array_split()`, which is only done on axis 0):
    effective_axis = 1 if axis == 0 else axis
    if effective_axis != axis:
        arr = arr.swapaxes(axis, effective_axis)

    # Chunks for the mapping (only a few chunks):
    chunks = [(func1d, effective_axis, sub_arr, args, kwargs)
              for sub_arr in np.array_split(arr, mp.cpu_count())]

    pool = mp.Pool(mp.cpu_count())
    individual_results = pool.map(unpacking_apply_along_axis, chunks)
    # Freeing the workers:
    pool.close()
    pool.join()

    return np.concatenate(individual_results)

def unpacking_apply_along_axis(tp):
    func1d, axis, arr, args, kwargs = tp
    """
    Like numpy.apply_along_axis(), but and with arguments in a tuple
    instead.

    This function is useful with multiprocessing.Pool().map(): (1)
    map() only handles functions that take a single argument, and (2)
    this function can generally be imported from a module, as required
    by map().
    """
    return np.apply_along_axis(func1d, axis, arr, *args, **kwargs)

In [14]:
def get_chunks(data, data_shape, file, dataset, chunk_size=256):
    # computing number of chunks and extra chunks
    chunks = (data_shape - data_shape % mp.cpu_count()) / chunk_size
    extra = chunks % mp.cpu_count()
    chunks -= extra

    # computing ranges of data for each cpu
    split = []
    extend = 0
    for i in range(mp.cpu_count()):
        sta = chunk_size * chunks / mp.cpu_count() * i
        end = chunk_size * chunks / mp.cpu_count() * (i + 1)
        if i >= mp.cpu_count() - np.ceil(extra):
            sta = sta + extend * chunk_size
            end = data_shape if i == mp.cpu_count() - 1 else end + (extend + 1) * chunk_size
            extend += 1
        split.append((int(sta), int(end)))
    
    # splitting data
    data = np.asarray(data)
    masked = []
    for st, en in split:
        mask = ((data >= st).astype(int) + (data < en).astype(int)) == 2
        masked.append(tuple([data[mask].tolist(), file, dataset]))
    
    return masked

def unpacking_h5py_reading(tp):
    ind, file, dataset = tp
    with h5py.File(file, mode='r') as f:
        ret = f[dataset][ind]
    return ret

def parallel_read_h5py(indicies, shape, path, dataset, chunk_size=256):
    chunks = get_chunks(indicies, shape, path, dataset, chunk_size)
    pool = mp.Pool(mp.cpu_count())
    individual_results = pool.map(unpacking_h5py_reading, chunks)
    pool.close()
    pool.join()

    return np.concatenate(individual_results)

In [15]:
def parallel_proc_h5(name, data, dims=128, batch=512):
    steps = int(np.ceil(data.shape[0] / batch))

    dir_file = '../data/hdf5/' + name + '.hdf5'
    print('Creating hdf5 file in directory: {}'.format(dir_file))
    hd_file = h5py.File(dir_file, mode='a')

    print('Creating datasets: ', end='')
    denoised_ds = hd_file.create_dataset(
        'denoised', shape=data.shape, dtype=np.float32, chunks=(256, 800000))
    print('denoised, ', end='')

    metrics_ds = hd_file.create_dataset(
        'stats',
        shape=(data.shape[0], 36, dims),
        dtype=np.float32,
        chunks=(256, 36, dims))
    print('stats, ', end='')

    spectrogram_ds = hd_file.create_dataset(
        'spectr',
        shape=(data.shape[0], 129, 27, dims),
        dtype=np.float32,
        chunks=(256, 129, 27, dims))
    print('spectr, ', end='')

    freq_dom_ds = hd_file.create_dataset(
        'freq',
        shape=(data.shape[0], 1000, dims),
        dtype=np.float32,
        chunks=(256, 1000, dims))
    print('freq.')

    t = tnrange(steps)
    for i in t:
        start = i * batch
        finish = (i + 1) * batch if i + 1 != steps else data.shape[0]

        t.set_description('Denoising')
        den_data = parallel_apply_along_axis(denoise_normalize_phase, 1,
                                             data[start:finish])
        denoised_ds[start:finish] = den_data

        t.set_description('Metrics')
        metrics_ds[start:finish] = parallel_apply_along_axis(
            feature_matrix, 1, den_data, func=metrics, n_dims=dims).transpose(
                (0, 2, 1))

        t.set_description('Spectrogram')
        spectrogram_ds[start:finish] = parallel_apply_along_axis(
            feature_matrix, 1, den_data, func=get_spectrogram,
            n_dims=dims).transpose((0, 2, 3, 1))

        t.set_description('Frequency')
        freq_dom_ds[start:finish] = parallel_apply_along_axis(
            feature_matrix, 1, den_data, func=get_freq_dom,
            n_dims=dims).transpose((0, 2, 1))
        
        del den_data

parallel_proc_h5('train', train)

# Plotting

In [None]:
def diff(df, measurement_id):
    phase = df[measurement_id]
    
    return phase[1:] - phase[:-1]

In [None]:
plot_phases(train, train_meta, 0, plot_range=[500000, 510000], url=True)

In [None]:
plot_phases_func(
    train, train_meta, 0, func=triple_denoise, name='denoise', url=True)

In [None]:
denoise_phase(train['0'].values)

In [None]:
plot_single_func(
    train, train_meta, 0, func=diff, name='diff', url=True)

In [None]:
plot_single_func(
    train, train_meta, 0, func=denoised_mean, name='wavelet_mean', url=True)

# Data generator

In [16]:
from keras.utils import Sequence
from imblearn.under_sampling import RandomUnderSampler
from sklearn.utils import shuffle
import os

Using TensorFlow backend.


In [22]:
class DataGenerator(Sequence):
    def __init__(self,
                 batch_size,
                 meta,
                 to_gen,
                 data_shape,
                 data=None,
                 file_path=None,
                 first=0):
        self.batch_size = batch_size
        self.meta = meta
        self.to_gen = [element.lower() for element in to_gen]
        self.data_shape = data_shape

        self.first = first

        print('{} will be generated.'.format(to_gen))
        if file_path is not None and os.path.exists(file_path):
            self.file_path = file_path
            self.cache = True
            print('Using cached data.')
        elif data is not None:
            self.data = data
            self.cache = False
            print(
                'Generating data on fly.\n*IT CAN BE EXTREMELY SLOW AND WILL SLOWDOWN TRAINING*'
            )
        else:
            raise ValueError(
                'data and file_path can not be None simultaneously. Set valid file_path or feed in valid data.'
            )

        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.ceil(len(self.dat_ind) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Get indexes of the batch
        start = index * self.batch_size
        end = (index + 1) * self.batch_size
        ind = self.dat_ind[start:end].tolist()

        # Generate data
        return self.__data_generation(ind)

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        # Undersampling
        under_sample = RandomUnderSampler(
            sampling_strategy='majority',
            random_state=np.random.randint(0, high=1024, size=1)[0])
        resampled = under_sample.fit_resample(
            self.meta['signal_id'].values.reshape((-1, 1)),
            self.meta['target'].values.reshape((-1, 1)),
        )
        ind, _ = shuffle(resampled[0].ravel(), resampled[1].ravel())
        # Resampling data indicies
        # In test data indicies starts from 8712, so we should fit them from 0 to *some value*
        # so that we could correctly get data from hdf5 file or pure dataset
        self.dat_ind = ind - self.first

    def __data_generation(self, ind):
        'Generates data containing batch_size samples'
        if self.cache:
            # Get data from file
            ind = np.asarray(ind)
            ind.sort()
            ind = ind.tolist()

            lists = self.__get_cached(ind)
            targets = self.meta['target'].values[ind]

            X = []
            state = np.random.randint(0, high=1024, size=1)[0]
            for l in lists:
                X_, y = shuffle(l, targets, random_state=state)
                X.append(X_)
        else:
            X = self.__generate_on_fly(ind)
            y = self.meta['target'].values[ind]

        return [X, y]

    def __get_cached(self, ind):
        'Get cached data from hdf5 file'
        X = []
        if 'denoised' in self.to_gen:
            X.append(
                parallel_read_h5py(ind, self.data_shape, self.file_path,
                                   'denoised'))
        if 'stats' in self.to_gen:
            X.append(
                parallel_read_h5py(ind, self.data_shape, self.file_path,
                                   'stats'))
        if 'spectr' in self.to_gen:
            X.append(
                parallel_read_h5py(ind, self.data_shape, self.file_path,
                                   'spectr'))
        if 'freq' in self.to_gen:
            X.append(
                parallel_read_h5py(ind, self.data_shape, self.file_path,
                                   'freq'))

        return X

    def __generate_on_fly(self, ind):
        'Generate data on fly'
        prep = self.data[ind]
        X = []
        if 'denoised' in self.to_gen:
            X.append(
                parallel_apply_along_axis(denoise_normalize_phase, 1, prep))
        if 'stats' in self.to_gen:
            X.append(
                parallel_apply_along_axis(
                    feature_matrix, 1, X[0], func=metrics).transpose((0, 2,
                                                                      1)))
        if 'spectr' in self.to_gen:
            X.append(
                parallel_apply_along_axis(
                    feature_matrix, 1, X[0], func=get_spectrogram).transpose(
                        (0, 2, 3, 1)))
        if 'freq' in self.to_gen:
            X.append(
                parallel_apply_along_axis(
                    feature_matrix, 1, X[0], func=get_freq_dom).transpose(
                        (0, 2, 1)))
        return X

# Coding

In [None]:
from keras.layers import (Bidirectional, LSTM, Dense, TimeDistributed, Conv1D,
                          Input, Add, BatchNormalization, ReLU, Dropout, Flatten, Activation)
from keras.models import Model
from keras.callbacks import (TensorBoard, ModelCheckpoint)
import keras.backend as K

In [None]:
def matthews_correlation(y_true, y_pred):
    y_pred_pos = K.round(K.clip(y_pred, 0, 1))
    y_pred_neg = 1 - y_pred_pos

    y_pos = K.round(K.clip(y_true, 0, 1))
    y_neg = 1 - y_pos

    tp = K.sum(y_pos * y_pred_pos)
    tn = K.sum(y_neg * y_pred_neg)

    fp = K.sum(y_neg * y_pred_pos)
    fn = K.sum(y_pos * y_pred_neg)

    numerator = (tp * tn - fp * fn)
    denominator = K.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))

    return numerator / (denominator + K.epsilon())

In [None]:
def res_layer(input_data, filters, kernel_size, block_num=None):
    if block_num is None:
        raise ValueError('Block number is not defined')

    block = 'res_block' + str(block_num) + '_'
    inp_ = input_data

    out = Conv1D(
        filters, kernel_size, padding='same', name=block + 'conv1')(input_data)
    out = BatchNormalization(name=block + 'bn1')(out)
    out = ReLU(name=block + 'relu1')(out)

    #     out = Dropout(drop, name=block + 'drop')(out)

    out = Conv1D(
        filters, kernel_size, padding='same', name=block + 'conv2')(out)
    out = BatchNormalization(name=block + 'bn2')(out)

    out = Add(name=block + 'add')([out, inp_])
    out = ReLU(name=block + 'relu2')(out)

    return out

In [None]:
def den_branch(inp):
    # Conv1D -> B-LSTM -> Attention -> Dense
    pass

def stats_branch(inp):
    # B-LSTM -> Attention -> Dense
    pass

def spectr_branch(inp):
    # Conv2D > Dense
    pass

def freq_branch(inp):
    # Conv1D -> B-LSTM -> Attention -> Dense
    pass

In [25]:
denoised = Input(shape=(800000, 1), name='Denoised')
stats = Input(shape=(36, 128), name='Statistics')
spectr = Input(shape=(129, 27, 128, 2), name='Spectrogram')
freq = Input(shape=(1000, 128, 2), name='Frequencies')




model = Model(inputs=[denoised, stats, spectr, freq], outputs=[output_model])

NameError: name 'Input' is not defined

In [None]:
model = get_model(filters, kernel, True)
model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy', matthews_correlation])

In [None]:
def callbacks(model_name, batch_size):
    m_tb = TensorBoard(
        log_dir='../logs/{}'.format(model_name),
        histogram_freq=1,
        write_graph=True,
        write_grads=True,
        batch_size=batch_size
    )
    m_ch = ModelCheckpoint(
        filepath='../weights/{}/{epoch:02d}-{val_loss:.2f}.hdf5'.format(model_name),
        save_best_only=False,
        save_weights_only=False,
        period=5
    )
    
    return [m_tb, m_ch]

In [None]:
model.fit(
    X_train,
    y_train,
    epochs=50,
    batch_size=256,
    verbose=1,
    callbacks=callbacks('model_1', 256)
)