In [1]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import matplotlib.pylab as plt
import seaborn as sns
from tsfresh import extract_features, extract_relevant_features, select_features
from tsfresh.utilities.dataframe_functions import impute
from sklearn.cross_validation import train_test_split
from sklearn.metrics import classification_report, confusion_matrix



In [2]:
import os
GPDB_HOST = os.environ['GPDB_HOST']

In [3]:
%load_ext sql
%sql postgresql://airflow_user:airflow@{GPDB_HOST}/airflow_test

'Connected: airflow_user@airflow_test'

## Create tsfresh features 

In [4]:
%%sql
drop function if exists tsfresh_features(
    text[],
    timestamp[],
    float[],
    float[],
    float[]
);

create or replace function tsfresh_features(
    trajectory_id text[],
    ttime timestamp[],
    distance_miles float[],
    interval_hour float[],
    speed float[]
)
returns setof ts_features
as
$$
    import pandas as pd
    import numpy as np
    from tsfresh import extract_features
    from tsfresh.utilities.dataframe_functions import impute
    from tsfresh.feature_extraction import ComprehensiveFCParameters, MinimalFCParameters
    
    df = pd.DataFrame({'id': trajectory_id,
                       'time': ttime,
                       'distance_miles': distance_miles,
                       'interval_hour': interval_hour,
                        'speed': speed})
    
    extraction_settings = MinimalFCParameters()

    X = extract_features(df, column_id='id', column_sort='time', 
                                      default_fc_parameters=extraction_settings,
                                 impute_function=impute)
    
    X = X.reset_index()
    
    X = X.melt(id_vars=['id'])
    
    X = X.dropna(axis=0)
    
    return zip(X.id, X.variable, X.value)
    
$$ language plpythonu;

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
Done.


[]

In [7]:
%sql select distinct trajectory_id from geolife.geolife_trajectory_speed_walk;

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
259 rows affected.


trajectory_id
2007-04-12_11
2007-04-12_8
2007-04-13_12
2007-04-13_14
2007-04-14_4
2007-04-14_6
2007-04-14_8
2007-04-15_1
2007-04-15_3
2007-04-15_5


In [10]:
%%time
%%sql
drop table if exists exp.ts_features_walk;
create table exp.ts_features_walk
as
with a as 
(
    select trajectory_id, 
        array_agg(trajectory_id ORDER BY trajectory_id, time DESC) as id,
        array_agg(time ORDER BY trajectory_id, time DESC) as ttime,
        array_agg(distance_miles ORDER BY trajectory_id, time DESC) as dm,
        array_agg(interval_hour ORDER BY trajectory_id, time DESC) as ih ,
        array_agg(speed ORDER BY trajectory_id, time DESC) as s
    from geolife.geolife_trajectory_speed_walk
    where trajectory_id in ('2007-04-12_11','2007-04-12_8','2007-04-13_14')
    group by trajectory_id
)
select (tsfresh_features(id, ttime, dm, ih, s)).*
from a

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
72 rows affected.
CPU times: user 8 ms, sys: 19.6 ms, total: 27.6 ms
Wall time: 2.06 s


[]

In [9]:
%%time
%%sql
drop table if exists exp.ts_features_walk;
create table exp.ts_features_walk
as
with a as 
(
    select trajectory_id, 
        array_agg(trajectory_id ORDER BY trajectory_id, time DESC) as id,
        array_agg(time ORDER BY trajectory_id, time DESC) as ttime,
        array_agg(distance_miles ORDER BY trajectory_id, time DESC) as dm,
        array_agg(interval_hour ORDER BY trajectory_id, time DESC) as ih ,
        array_agg(speed ORDER BY trajectory_id, time DESC) as s
    from geolife.geolife_trajectory_speed_walk
    where trajectory_id in ('2007-04-12_11',
                            '2007-04-12_8',
                            '2007-04-13_12',
                            '2007-04-13_14',
                            '2007-04-14_4',
                            '2007-04-14_6',
                            '2007-04-14_8',
                            '2007-04-15_1',
                            '2007-04-15_3',
                            '2007-04-15_5',
                            '2007-04-16_11',
                            '2007-04-16_2',
                            '2007-04-16_4',
                            '2007-04-16_6',
                            '2007-04-17_1',
                            '2007-04-17_5',
                            '2007-04-18_2',
                            '2007-04-18_4',
                            '2007-04-18_6',
                            '2007-04-18_8',
                            '2007-04-19_1',
                            '2007-04-19_10',
                            '2007-04-19_12',
                            '2007-04-19_3',
                            '2007-04-19_5',
                            '2007-04-19_7',
                            '2007-04-19_9',
                            '2007-04-20_1',
                            '2007-04-20_3',
                            '2007-04-20_5',
                            '2007-04-20_7',
                            '2007-04-21_11',
                            '2007-04-21_2',
                            '2007-04-21_4',
                            '2007-04-21_6',
                            '2007-04-21_8',
                            '2007-04-22_1',
                            '2007-04-23_2',
                            '2007-04-24_1',
                            '2007-04-24_3',
                            '2007-04-25_2',
                            '2007-04-27_2',
                            '2007-04-27_4',
                            '2007-04-28_1',
                            '2007-04-30_2',
                            '2007-04-30_4',
                            '2007-04-30_8')
    group by trajectory_id
)
select (tsfresh_features(id, ttime, dm, ih, s)).*
from a

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
(psycopg2.OperationalError) terminating connection due to administrator command
server closed the connection unexpectedly
	This probably means the server terminated abnormally
	before or while processing the request.
 [SQL: "create table exp.ts_features_walk\nas\nwith a as \n(\n    select trajectory_id, \n        array_agg(trajectory_id ORDER BY trajectory_id, time DESC) as id,\n        array_agg(time ORDER BY trajectory_id, time DESC) as ttime,\n        array_agg(distance_miles ORDER BY trajectory_id, time DESC) as dm,\n        array_agg(interval_hour ORDER BY trajectory_id, time DESC) as ih ,\n        array_agg(speed ORDER BY trajectory_id, time DESC) as s\n    from geolife.geolife_trajectory_speed_walk\n    where trajectory_id in ('2007-04-12_11',\n'2007-04-12_8',\n'2007-04-13_12',\n'2007-04-13_14',\n'2007-04-14_4',\n'2007-04-14_6',\n'2007-04-14_8',\n'2007-04-15_1',\n'2007-04-15_3',\n'2007-04-15_5',\n'2007-04-16_11',

## Pivot the ts_feature table from long form to wide form

In [26]:
%%sql
drop table if exists exp.ts_features_walk_pvt;
drop table if exists exp.ts_features_walk_pvt_dictionary;
select madlib.pivot('exp.ts_features_walk', --source_table
    'exp.ts_features_walk_pvt', --output_table
    'id', --index
    'feature_name', --pivot col
    'value') --pivot_val

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
Done.
1 rows affected.


pivot


In [30]:
%%sql 
drop table if exists exp.walk_features;
create table exp.walk_features
as
with l as (
    select trajectory_id as id, 
        label 
    from geolife.geolife_trajectory_speed_walk 
    group by 1, 2
)
select * 
from exp.ts_features_walk_pvt f 
inner join l using (id)

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
259 rows affected.


[]

## Test/train split

In [32]:
%%sql
DROP TABLE IF EXISTS exp.features_walk_test, exp.features_walk_train;
SELECT madlib.train_test_split(
                                'exp.walk_features',    -- Source table
                                'exp.features_walk',     -- Output table
                                0.8,       -- Sample proportion
                                0.2,       -- Sample proportion
                                NULL, -- Strata definition
                                NULL, -- Columns to output
                                FALSE,     -- Sample without replacement
                                TRUE);    -- Separate output tables

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
1 rows affected.


train_test_split


## Run random forest model

In [33]:
%%sql
DROP TABLE IF EXISTS exp.rf_walk_output, exp.rf_walk_output_group, exp.rf_walk_output_summary;
SELECT madlib.forest_train('exp.features_walk_train',         -- source table
                           'exp.rf_walk_output',    -- output model table
                           'id',              -- id column
                           'label',           -- response
                           '*',   -- features
                           NULL,              -- exclude columns
                           NULL,              -- grouping columns
                           20::integer,       -- number of trees
                           2::integer,        -- number of random features
                           TRUE::boolean,     -- variable importance
                           1::integer,        -- num_permutations
                           8::integer,        -- max depth
                           3::integer,        -- min split
                           1::integer,        -- min bucket
                           10::integer        -- number of splits per continuous variable
                           );

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
1 rows affected.


forest_train


## Model evaluation

In [34]:
%%sql
DROP TABLE IF EXISTS exp.rf_results;
SELECT madlib.forest_predict('exp.rf_walk_output',        -- tree model    
                             'exp.features_walk_test',             -- new data table
                             'exp.rf_walk_results') --,  -- output table
                             --'prob');               -- show probability

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
1 rows affected.


forest_predict


In [35]:
%%sql
select * from exp.rf_walk_results limit 5

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
5 rows affected.


id,estimated_label
2007-04-16_4,True
2007-04-17_5,True
2007-04-18_4,True
2007-04-18_6,False
2007-04-19_5,True


In [36]:
%%sql
drop table if exists exp.walk_result;
create table exp.walk_result
as
with t as (
select id,
    case when label = True then 1.0 else 0.0 end as obs
from exp.features_walk_test
)
select id,
    obs,
    case when estimated_label = True then 1.0 else 0.0 end as pred
from exp.rf_walk_results r inner join t using (id)

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
52 rows affected.


[]

In [38]:
%%sql
DROP TABLE IF EXISTS exp.walk_auc;
SELECT madlib.area_under_roc( 'exp.walk_result', 'exp.walk_auc', 'pred', 'obs');
SELECT * FROM exp.walk_auc;

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
1 rows affected.
1 rows affected.


area_under_roc
0.9827586206896552


In [40]:
%%sql
DROP TABLE IF EXISTS exp.walk_cm;
SELECT madlib.confusion_matrix( 'exp.walk_result', 'exp.walk_cm', 'pred', 'obs');
SELECT * FROM exp.walk_cm ORDER BY class;

 * postgresql://airflow_user:***@172.16.143.130/airflow_test
Done.
1 rows affected.
2 rows affected.


row_id,class,confusion_arr
1,0.0,"[Decimal('28'), Decimal('1')]"
2,1.0,"[Decimal('0'), Decimal('23')]"
