### In this notebook I use dask

This is demonstrating how to use dask for our data processing. 

In [None]:
import dask

In [None]:
### Load the data

In [None]:
from data_processing import load_data, get_files, parse_filename
from feature_extraction import extract_fft_features, extract_time_domain_features
import pandas as pd
import dask
from dask import do
from os.path import split
from dask import delayed, compute

In [None]:
files =get_files()[:2]

In [None]:
def parse_filename(filename, split_file=False):
    """Parses m filename to get the pertinent information"""
    if split_file:
        filename = split(filename)[1]

    # strip out the .mat
    filename = filename.replace('.mat', '')

    # parse the remaing part
    return [int(part) for part in filename.split('_')]

def map_functions(data, functions):
    """maps a list of functions to data and returns as a list of results
    Parameters: 
        data: data to be computed on
        functions(list): a list of functions
    Returns: 
        results(list): a list of the results
    """
    return [fun(data) for fun in functions]


def process_data(file_name, functions=None):
    """Processes one file at a time for extracting features
    Parameters: 
        file_name(str): the file name
        functions(list): a list of functions for extracting features
    Returns: 
        res(pd.DataFrame): a one row data frame with the features in the columns    
    """
    
    if functions is None: 
        functions = [extract_time_domain_features,extract_fft_features]
    
    # get the time series and parse the filename for the info
    time_series = load_data(file_name,True)[0]
    patient,number,condition = parse_filename(file_name, True)
    
    # create an index and prefix df
    index = pd.MultiIndex.from_tuples([(patient, number, condition)], 
                                      names=['Patient', 'TraceNumber', 'Condition'])

    prefix_df = pd.DataFrame({'Patient':patient,
                              'TraceNumber':number,
                              'Condition':condition},
                             index = [0]
                              )
    
    # create a list two hold the data frames, call the functions and then concatenate the resulting dataframes
    res = [prefix_df]
    res.extend(map_functions(time_series,functions))
    res = pd.concat(res, axis =1)
    res.index = index
    return res
    
def process_multiple_data(files):
    """uses dask to process many files in parallel"""
    # set up the compute graph
    graph = delayed([delayed(process_data)(file_) for file_ in files])
    # compute the graph
    results = compute(graph)
    
    return pd.concat([results[0][i] for i in range(len(files))])

features = process_multiple_data(files)

In [None]:
def map_functions(data, functions):
    """maps a list of functions to data (slowly)"""
    return [fun(data) for fun in functions]

import numpy as np
functions = [np.sin, np.cos]
map_functions(1,functions)

### Try the new code


In [15]:
from os import listdir
import pandas as pd
from os.path import join, split
from data_processing import Processor,get_data_files,load_data,interpolate_zeros,replace_outliers_with_zeros

In [2]:
processor = Processor()

In [3]:
test_path =['/Users/crivera5/Desktop/test']

In [5]:
res = processor.process_data(test_path)

LinAlgError: Array must not contain infs or NaNs

Traceback
---------
  File "/Users/crivera5/.virtual_envs/Kaggle/lib/python2.7/site-packages/dask/async.py", line 268, in execute_task
    result = _execute_task(task, data)
  File "/Users/crivera5/.virtual_envs/Kaggle/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
    return func(*args2)
  File "data_processing.py", line 224, in process_file
    feature_df_list = [fun(df) for fun in self.list_of_functions]
  File "feature_extraction.py", line 356, in extract_time_domain_features
    extract_correlations(time_series),
  File "feature_extraction.py", line 35, in extract_correlations
    eigs = pd.Series(la.eigvals(corr))
  File "/Users/crivera5/.virtual_envs/Kaggle/lib/python2.7/site-packages/numpy/linalg/linalg.py", line 903, in eigvals
    _assertFinite(a)
  File "/Users/crivera5/.virtual_envs/Kaggle/lib/python2.7/site-packages/numpy/linalg/linalg.py", line 217, in _assertFinite
    raise LinAlgError("Array must not contain infs or NaNs")


In [44]:
base = test_path[0]
data=load_data(join(base,listdir(base)[0]))[0]

In [53]:
import numpy as np
def interpolate_zeros(df):
    """Replaces zero values using linear interpolation
    Parameters:
        df(pd.DataFrame): a data frame with numeric values
    Returns:
        df
    """

    def replace_zeros(x):
        if x == 0.0:
            return np.nan
        return x

    df = df.copy()
    df = df.applymap(replace_zeros)
    return res
    return df.interpolate(method='linear')

In [57]:
def replace_zeros(x):
        if x == 0.0:
            return np.nan
        return x
df = data.copy()
df = df.applymap(replace_zeros)

In [55]:
res

Unnamed: 0,1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0
00:00:00,,,,,,,,,,,,,,,,
00:00:00.002500,,,,,,,,,,,,,,,,
00:00:00.005000,,,,,,,,,,,,,,,,
00:00:00.007500,,,,,,,,,,,,,,,,
00:00:00.010000,,,,,,,,,,,,,,,,
00:00:00.012500,,,,,,,,,,,,,,,,
00:00:00.015000,,,,,,,,,,,,,,,,
00:00:00.017500,,,,,,,,,,,,,,,,
00:00:00.020000,,,,,,,,,,,,,,,,
00:00:00.022500,,,,,,,,,,,,,,,,
