In [4]:
import pandas as pd
import numpy as np
import os
import pickle
import gzip
import timeit
import warnings
from matplotlib import pyplot as plt
from functools import partial
from pandas.tseries.frequencies import to_offset

warnings.filterwarnings("ignore")

data_folder = './samplecsvdata/'
fs = os.listdir(data_folder)

In [5]:
%load_ext line_profiler

The line_profiler extension is already loaded. To reload it, use:
  %reload_ext line_profiler


## This roll_up_ten eliminates possible resample calls (all but 2) and uses separate agg calls to apply different functions to each column:

In [64]:
#load sleep model
sleep_model_path = './SleepModel10Min.sav'
sleep_clfr = pickle.load(open(sleep_model_path, 'rb'))

#load norm data
sleep_norms = pd.read_csv('sleep_norms.csv', index_col=0)

#define sleep feature columns
feature_columns = ['count', 'valid_count', 'movement_raw_mean', 'movement_raw_min',
   'movement_raw_max', 'movement_raw_std', 'heart_rate_raw_mean',
   'heart_rate_raw_min', 'heart_rate_raw_max', 'heart_rate_raw_std', 'oxygen_raw_mean',
   'oxygen_raw_min', 'oxygen_raw_max', 'oxygen_raw_std']

def roll_up_ten(df):
    df = df.copy().drop_duplicates('timestamp')
    #clean data
    df.index = pd.to_datetime(df.timestamp, utc=True, unit='s')
    df = df[(df.base_state>=4) & (df.ble_rssi !=0) & (df.heart_rate_raw >0)]
    df_valid = df[(df.notification_mask == 0) & (df.heart_rate_raw > 0)]

    #make at least 1 row so the dataframes can be merged later
    if(len(df_valid)==0 and len(df)>0):
        df_valid.loc[df.index[0]] = np.NaN
                
    # Resample
    group = df.resample("10T")
    group_valid = df_valid[valid_agg_cols].resample("10T")
    
    #dsn,count
    df_dsn = group.agg({'dsn':['first','count']})
    df_dsn.columns = ['dsn','count']
       
    #valid count this has an issue
    def count_valid_reads(array_like):
        return np.sum(array_like == 0)

    df_valid_count = group.agg({'notification_mask':count_valid_reads})
    df_valid_count.columns = ['valid_count']
    
    #base state
    def base_state_4(array_like): return 4 in array_like.values
    def base_state_6(array_like): return 6 in array_like.values
    def base_state_7(array_like): return 7 in array_like.values
    def base_state_8(array_like): return 8 in array_like.values
    def base_state_9(array_like): return 9 in array_like.values
    def base_state_10(array_like): return 10 in array_like.values
    def base_state_12(array_like): return 12 in array_like.values
    
    df_states = group.agg({'base_state':[base_state_4,base_state_6,base_state_7,base_state_8,base_state_9,base_state_10,base_state_12]})       
    df_states.columns = [col[1] for col in df_states.columns.values]
    
    #aggregate over columns that don't require valid data
    agg_cols = [
        'movement_raw', 
        'skin_temperature',
        'red_led_current',
        'ir_led_current',
        'ble_rssi',
        'battery_level',
    ]
    df_agg = group.agg({col:['mean','min','max','std'] for col in agg_cols})
    df_agg.columns = ['_'.join(col).strip() for col in df_agg.columns.values]
    
    #aggregate over columns that do require valid data
    valid_agg_cols = [
        'heart_rate_avg',
        'heart_rate_raw', 
        'oxygen_avg', 
        'oxygen_raw' 
    ]
    
    df_agg_valid = group_valid.apply(['mean','min','max','std'])
    df_agg_valid.columns = ['_'.join(col).strip() for col in df_agg_valid.columns.values]
    
    df_rolled = pd.concat([df_dsn,df_valid_count,df_agg,df_agg_valid,df_states],axis = 1)
    
    #add sleep
    if(len(df_rolled)==0):
        df_rolled['sleep_states'] = np.nan
    else:
        features = df_rolled[feature_columns].apply(pd.to_numeric).fillna(method='ffill').fillna(method='bfill')
        features = (features - sleep_norms.m)/sleep_norms.s
        df_rolled['sleep_states'] = sleep_clfr.predict(features.values)

    return df_rolled.dropna(subset=['dsn'])

In [None]:
col_names = pd.read_csv('column_names.txt').columns
df = pd.read_csv(data_folder + fs[0], names=col_names)
df_rolled = roll_up_ten(df)
df_rolled.head()

## This roll_up_ten eliminates possible .resample calls (all but 2) but uses 1 .agg call and 1 .apply

In [59]:
#load sleep model
sleep_model_path = './SleepModel10Min.sav'
# sleep_clfr = pickle.load(open(sleep_model_path, 'rb'))

#load norm data
# sleep_norms = pd.read_csv('sleep_norms.csv', index_col=0)

#define sleep feature columns
feature_columns = ['count', 'valid_count', 'movement_raw_mean', 'movement_raw_min',
   'movement_raw_max', 'movement_raw_std', 'heart_rate_raw_mean',
   'heart_rate_raw_min', 'heart_rate_raw_max', 'heart_rate_raw_std', 'oxygen_raw_mean',
   'oxygen_raw_min', 'oxygen_raw_max', 'oxygen_raw_std']

def roll_up_ten(df):
    df = df.copy().drop_duplicates('timestamp')
    #clean data
    df.index = pd.to_datetime(df.timestamp, utc=True, unit='s')
    df = df[(df.base_state>=4) & (df.ble_rssi !=0) & (df.heart_rate_raw >0)]
    df_valid = df[(df.notification_mask == 0) & (df.heart_rate_raw > 0)]    

    #make at least 1 row so the dataframes can be merged later
    if(len(df_valid)==0 and len(df)>0):
        df_valid.loc[df.index[0]] = np.NaN
              
    #valid count this has an issue
    def count_valid_reads(array_like):
        return np.sum(array_like == 0)
    
    #base state
    def base_state_4(array_like): return 4 in array_like.values
    def base_state_6(array_like): return 6 in array_like.values
    def base_state_7(array_like): return 7 in array_like.values
    def base_state_8(array_like): return 8 in array_like.values
    def base_state_9(array_like): return 9 in array_like.values
    def base_state_10(array_like): return 10 in array_like.values
    def base_state_12(array_like): return 12 in array_like.values
    
    #aggregate over columns that don't require valid data
    agg_cols = [
        'movement_raw', 
        'skin_temperature',
        'red_led_current',
        'ir_led_current',
        'ble_rssi',
        'battery_level',
    ]
    
    #aggregate over columns that do require valid data
    valid_agg_cols = [
        'heart_rate_avg',
        'heart_rate_raw', 
        'oxygen_avg', 
        'oxygen_raw' 
    ]
    
    # Resample
    group = df.resample("10T")
    group_valid = df_valid[valid_agg_cols].resample("10T")
    
    functions_to_apply = {'dsn':['first','count'], 'notification_mask':count_valid_reads, 
                          'base_state':[base_state_4,base_state_6,base_state_7,base_state_8,base_state_9,base_state_10,base_state_12]}
    functions_to_apply.update({col:['mean','min','max','std'] for col in agg_cols})
    df1 = group.agg(functions_to_apply)
    
    # valid data
    df_agg_valid = group_valid.apply(['mean','min','max','std'])
    df_agg_valid.columns = ['_'.join(col).strip() for col in df_agg_valid.columns.values]

    # Figure out how to get correct column names
#     df_dsn.columns = ['dsn','count']
#     df_valid_count.columns = ['valid_count']
#     df_states.columns = [col[1] for col in df_states.columns.values]
#     df_agg.columns = ['_'.join(col).strip() for col in df_agg.columns.values]

    df_rolled = pd.concat([df,df_agg_valid],axis = 1)
    
#     add sleep
#     if(len(df_rolled)==0):
#         df_rolled['sleep_states'] = np.nan
#     else:
#         features = df_rolled[feature_columns].apply(pd.to_numeric).fillna(method='ffill').fillna(method='bfill')
#         features = (features - sleep_norms.m)/sleep_norms.s
#         df_rolled['sleep_states'] = sleep_clfr.predict(features.values)

    return df_rolled#.dropna(subset=['dsn'])

In [60]:
# find bottlenecks
col_names = pd.read_csv('column_names.txt').columns
df = pd.read_csv(data_folder + fs[0], names=col_names)
%lprun -f roll_up_ten roll_up_ten(df)

In [None]:
df_rolled.columns

In [None]:
%%prun
df_rolled = roll_up_ten(df)

In [None]:
# %%timeit
# df = pd.read_csv(data_folder + fs[0], names=col_names)
# df_rolled = roll_up_ten(df)

In [None]:
%%time

valid_agg_cols = [
    'heart_rate_avg',
    'heart_rate_raw', 
    'oxygen_avg', 
    'oxygen_raw' 
]

my_df = pd.read_csv('../mask9_2018/20181120/AC000W001055137.csv.gz', names=col_names) #bad data
# my_df = pd.read_csv('../mask9_2018/20181120/AC000W000652029.csv.gz', names=col_names) #good data
df = my_df.copy()
df = df.drop_duplicates('timestamp')

#clean data
df.index = pd.to_datetime(df.timestamp, utc=True, unit='s')
df = df[(df.base_state>=4) & (df.ble_rssi !=0) & (df.heart_rate_raw >0)]
df[valid_agg_cols][(df.notification_mask == 0) & (df.heart_rate_raw > 0)] = np.nan

if(len(df) == 0):
    print('oops') 


#dsn,count
group = df[['dsn']].resample("10T")
print(len(group))
%time df_dsn = group.apply(['first','count'])
df_dsn.columns = ['dsn','count']


#base state
def base_state_4(array_like): return 4 in array_like.values
def base_state_6(array_like): return 6 in array_like.values
def base_state_7(array_like): return 7 in array_like.values
def base_state_8(array_like): return 8 in array_like.values
def base_state_9(array_like): return 9 in array_like.values
def base_state_10(array_like): return 10 in array_like.values
def base_state_12(array_like): return 12 in array_like.values

group = df[['base_state']].resample("10T")
df_states = group.apply([base_state_4,base_state_6,base_state_7,base_state_8,base_state_9,base_state_10,base_state_12])

df_states.columns = [col[1] for col in df_states.columns.values]

#aggregate over columns that don't require valid data
agg_cols = [
    'movement_raw', 
    'skin_temperature',
    'red_led_current',
    'ir_led_current',
    'ble_rssi',
    'battery_level',
    'heart_rate_avg',
    'heart_rate_raw', 
    'oxygen_avg', 
    'oxygen_raw' 
]
group = df[agg_cols].resample("10T")
df_agg = group.apply(['mean','min','max','std'])
df_agg.columns = ['_'.join(col).strip() for col in df_agg.columns.values]


#add sleep
if(len(df_rolled)==0):
    df_rolled['sleep_states'] = np.nan
else:
    features = df_rolled[feature_columns].apply(pd.to_numeric).fillna(method='ffill').fillna(method='bfill')
    features = (features - sleep_norms.m)/sleep_norms.s
    df_rolled['sleep_states'] = sleep_clfr.predict(features.values)

In [None]:
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
    display(df_rolled)

In [None]:
#good
my_df = pd.read_csv('../mask9_2018/20180101/AC000W000423061.csv.gz', names=col_names) #good data
my_df = my_df.copy().sort_values('timestamp').reset_index()
df = my_df.copy()
#clean data
df.index = pd.to_datetime(df.timestamp, utc=True, unit='s')
df = df[(df.base_state>=4) & (df.ble_rssi !=0) & (df.heart_rate_raw >0)]
df_valid = df[(df.notification_mask == 0) & (df.heart_rate_raw > 0)]
df_valid.plot(x=['timestamp','timestamp'], y=['heart_rate_raw', 'movement_raw'], kind='scatter')
plt.show()
df.plot(x=['timestamp','timestamp'], y=['heart_rate_raw', 'movement_raw'], kind='scatter')
plt.show()
my_df.plot(x=['timestamp','timestamp'], y=['heart_rate_raw', 'movement_raw'], kind='scatter')
plt.show()
print(len(my_df))
print(len(df))
print(len(df_valid))

In [None]:
#bad
# my_df = pd.read_csv('../mask9_2018/20181120/AC000W001055137.csv.gz', names=col_names) #bad data
my_df = pd.read_csv('../mask9_2018/20180101/AC000W001144880.csv.gz', names=col_names) #bad data

my_df = my_df.copy().sort_values('timestamp').reset_index()
df = my_df.copy()
#clean data
df.index = pd.to_datetime(df.timestamp, utc=True, unit='s')
df = df[(df.base_state>=4) & (df.ble_rssi !=0) & (df.heart_rate_raw >0)]
df_valid = df[(df.notification_mask == 0) & (df.heart_rate_raw > 0)]
df_valid.plot(x=['timestamp','timestamp'], y=['heart_rate_raw', 'movement_raw'], kind='scatter')
plt.show()
df.plot(x=['timestamp','timestamp'], y=['heart_rate_raw', 'movement_raw'], kind='scatter')
plt.show()
my_df.plot(x=['timestamp','timestamp'], y=['heart_rate_raw', 'movement_raw'], kind='scatter')
plt.show()
print(len(my_df))
print(len(df))
print(len(df_valid))

In [None]:
my_df.sort_values('timestamp').head(10)

In [None]:
!pip install --upgrade pandas

In [None]:
df_rolled

In [None]:
df_rolled.loc['2018-11-20 23:50:00+00:00']

In [None]:
log_data_folder = '../mask9_2018/'
all_dirs = os.listdir(log_data_folder)
all_dirs.sort()
print(all_dirs)

In [None]:
log_data_folder = '../mask9_2018/'
all_dirs = os.listdir(log_data_folder)
all_dirs.sort()
count = 0
for d in all_dirs:
    count += 1
    print(d)
    all_files = os.listdir(log_data_folder+d)
    for f in all_files:
        tic = timeit.default_timer()
        p = log_data_folder + d + '/' + f
        df = pd.read_csv(p, names=col_names)
        df_rolled = roll_up_ten(df)
        if(len(df_rolled) > 0):
            dp = '../mask9_2018_rolled2/' + df_rolled['dsn'][0] +'.csv.gz'
            df_rolled.to_csv(dp, mode='a', header=False, compression='gzip')