In [1]:
from pathlib import Path

import numpy as np
import pandas as pd

In [4]:
DATA = Path.home() / 'work' / 'data'
DATA_RAW = DATA / 'raw'
DATA_PROCESSED = DATA / 'processed'

In [3]:
train_truncated = pd.read_parquet(DATA_PROCESSED / 'train_ts_truncated.parquet')
test = pd.read_parquet(DATA_PROCESSED / 'test_ts_truncated.parquet')

In [4]:
train_target = pd.read_parquet(DATA_PROCESSED / 'train_target.parquet')
test_target = pd.read_parquet(DATA_PROCESSED / 'test_target.parquet')

In [5]:
final_test = pd.read_parquet(DATA_RAW / 'test_values.parquet')

In [6]:
recipe_metadata = pd.read_csv(DATA_RAW / 'recipe_metadata.csv', index_col=0)

In [7]:
submission_format = pd.read_csv(DATA_RAW / 'submission_format.csv', index_col=0)

# Preprocessing

In [8]:
to_drop = ['tank_lsh_acid', 'tank_lsh_pre_rinse', 'target_time_period']

In [9]:
train_truncated = train_truncated.drop(to_drop, axis=1)
test = test.drop(to_drop, axis=1)
final_test = final_test.drop(to_drop, axis=1)

# Implementation

In [10]:
metadata = ['object_id', 'pipeline']
phase_order = ['pre_rinse', 'caustic', 'intermediate_rinse', 'acid']

In [11]:
def phase_count(timeseries):
    df = timeseries.groupby('process_id')['phase'].unique()
    df = df.apply('|'.join).str.get_dummies().astype(bool)
    df = df.reindex(phase_order, axis=1)
    df.columns = 'phase:' + df.columns
    return df

In [12]:
def phase_duration(timeseries):
    df = timeseries.groupby(['process_id', 'phase'])['timestamp'].agg(['min', 'max'])
    df['duration'] = (df['max'] - df['min']).dt.total_seconds()
    df = df[['duration']].unstack(-1)
    df.columns = df.columns.to_series().apply(lambda g: ':'.join(g[::-1]))  # phase first in the name
    return df

In [13]:
def total_turbidity(timeseries):
    """Calculates the target value for all phases (for the entire duration)."""
    df = timeseries.groupby(['process_id', 'phase'])['turbidity'].sum().unstack()
    df.columns = df.columns.to_series() + ':total_turbidity'
    return df

In [14]:
def get_first_and_last(data, groupby_cols, float_cols, N):
    first = data[groupby_cols + float_cols].groupby(groupby_cols).head(N).groupby(groupby_cols).mean()
    last  = data[groupby_cols + float_cols].groupby(groupby_cols).tail(N).groupby(groupby_cols).mean()
    first.columns = pd.MultiIndex.from_tuples([(col, 'first%d' % N) for col in first.columns])
    last.columns  = pd.MultiIndex.from_tuples([(col,  'last%d' % N) for col in last.columns])
    return pd.concat([first, last], axis=1)

In [15]:
def apply_funcs(data, groupby_cols=['process_id']):
    """Faster than aggregating all functions at once. The hard part is recreating the multi-index columns."""
    float_cols = data.select_dtypes('float').columns.tolist()
    
    # agg standard functions
    standard = data.groupby(groupby_cols)[float_cols].agg(['sum', 'median', 'mean', 'std', 'max', 'min'])

    # apply quantile
    qs = [.2, .8]
    quantiles = data.groupby(groupby_cols)[float_cols].quantile(qs).unstack(-1)
    quantiles.columns = pd.MultiIndex.from_tuples([(col[0], 'q%d' % int(100 * col[1])) for col in quantiles.columns.get_values()])

    # calculate average of first and last values
    first_and_last = get_first_and_last(data, groupby_cols, float_cols, 10)  # last param somewhat optimized
    
    return pd.concat([standard, quantiles, first_and_last], axis=1)

In [16]:
def groupby_and_apply(timeseries, prefix=''):
    df1 = apply_funcs(timeseries)
    df1.columns = prefix + df1.columns.to_series().apply(':'.join)
    
    df2 = apply_funcs(timeseries, ['process_id', 'phase'])
    df2 = df2.unstack(-1)  # make phase a third index level of columns
    df2.columns = prefix + df2.columns.to_series().apply(':'.join)
    
    return pd.concat([df1, df2], axis=1)

In [17]:
def float_features_fast(timeseries, add_diff=True):
    """Select features of the timeseries, its derivative, all grouped by process or process and phase."""
    df1 = groupby_and_apply(timeseries)
    
    if add_diff:
        float_cols = timeseries.select_dtypes('float').columns.tolist()
        # diff = timeseries[float_cols].diff().where(timeseries['process_id'].shift() == timeseries['process_id'], np.nan)
        smoothing_window = 10
        diff = timeseries.groupby('process_id')[float_cols].transform(
            lambda g: g.rolling(smoothing_window).mean().diff(smoothing_window))
        
        df2 = groupby_and_apply(pd.concat([timeseries[['process_id', 'phase']], diff],       axis=1).dropna(), prefix='diff:')
        df3 = groupby_and_apply(pd.concat([timeseries[['process_id', 'phase']], diff.abs()], axis=1).dropna(), prefix='absdiff:')
        
        return pd.concat([df1, df2, df3], axis=1)

    return df1

In [18]:
def boolean_features_fast(timeseries):
    bool_columns = timeseries.select_dtypes('bool').columns.tolist()
    data = timeseries[['process_id', 'phase'] + bool_columns].copy()
    data[bool_columns] = data[bool_columns].astype(int)
    
    # percentage of time the boolean value is "on" for each phase...
    df = data.groupby(['process_id', 'phase'])[bool_columns].agg(['sum', 'mean'])
    df = df.unstack(-1)
    df.columns = df.columns.to_series().apply(':'.join)
    # ... and in total
    df1 = data.groupby('process_id')[bool_columns].agg(['sum', 'mean'])
    df1.columns = df1.columns.to_series().apply(':'.join)
    
    # first and last values
    fl = get_first_and_last(data, ['process_id', 'phase'], bool_columns, 10)
    fl = fl.unstack(-1)
    fl.columns = fl.columns.to_series().apply(':'.join)

    fl1 = get_first_and_last(data, ['process_id'], bool_columns, 10)
    fl1.columns = fl1.columns.to_series().apply(':'.join)

    return pd.concat([df, df1, fl, fl1], axis=1)

In [19]:
def harmonic_features(timeseries):
    """Calculates three maximums of the power spectrum."""
    def power_spectrum(t):
        A = np.fft.fft(t)  # Discrete Fourier Transform
        return np.abs(A) ** 2
    
    df = timeseries.groupby('process_id')['turbidity'].apply(lambda x: np.sort(power_spectrum(x))[-3:]).apply(pd.Series)
    df.columns = ['harmonic:power_spectrum_max{}'.format(i) for i in [3, 2, 1]]
    return df

In [20]:
def join_features(timeseries):
    features = timeseries.groupby('process_id')[metadata].first().astype(str)
    features = features.join(recipe_metadata.astype(str).apply(''.join, axis=1).rename('recipe'))
    if features.isna().any().any():
        raise ValueError('Missing values for %s' % str(metadata))
        
    timeseries['log_return_turbidity'] = np.log(1 + timeseries['return_turbidity'])
    timeseries['turbidity'] = np.maximum(timeseries['return_flow'], 0) * timeseries['return_turbidity']
    timeseries['log_turbidity'] = np.log(1 + timeseries['turbidity'].clip(lower=0))
    
    print('Adding boolean features...')
    features = features.join(boolean_features_fast(timeseries))
    print('Adding phase count...')
    features = features.join(phase_count(timeseries))
    print('Adding phase duration...')
    features = features.join(phase_duration(timeseries))
    print('Adding total turbidity...')
    features = features.join(total_turbidity(timeseries))
    print('Adding float features (this may take a while)...')
    features = features.join(float_features_fast(timeseries, add_diff=False))
    print('Adding harmonic features...')
    features = features.join(harmonic_features(timeseries))
    
    return features

# Feature calculation

In [21]:
%%time
train_features = join_features(train_truncated)

Adding boolean features...
Adding phase count...
Adding phase duration...
Adding total turbidity...
Adding float features (this may take a while)...
Adding harmonic features...
CPU times: user 16.1 s, sys: 2.2 s, total: 18.3 s
Wall time: 18.3 s


In [22]:
%%time
test_features = join_features(test)

Adding boolean features...
Adding phase count...
Adding phase duration...
Adding total turbidity...
Adding float features (this may take a while)...
Adding harmonic features...
CPU times: user 7.15 s, sys: 827 ms, total: 7.98 s
Wall time: 7.97 s


In [23]:
%%time
final_test_features = join_features(final_test)

Adding boolean features...
Adding phase count...
Adding phase duration...
Adding total turbidity...
Adding float features (this may take a while)...
Adding harmonic features...
CPU times: user 13.5 s, sys: 1.61 s, total: 15.2 s
Wall time: 15.2 s


## Sanity check

In [24]:
assert (train_features.index == train_target.index).all()

In [25]:
assert (test_features.index == test_target.index).all()

In [26]:
assert (final_test_features.index == submission_format.index).all()

## Save

In [27]:
train_features.to_parquet(DATA_PROCESSED / 'train_features.parquet')

In [28]:
test_features.to_parquet(DATA_PROCESSED / 'test_features.parquet')

In [29]:
final_test_features.to_parquet(DATA_PROCESSED / 'final_test_features.parquet')