# Notebook implementing the van Hees algorithm on some SPHERE wearable data

Notes (todos):

- [x] Implemented van Hees algorithm
- [x] Use house plate
- [ ] Use wearable plate
- [ ] Remove duplicates

In [1]:
from hyperstream import HyperStream, StreamId, TimeInterval, TimeIntervals, StreamInstanceCollection

import os
from datetime import timedelta, datetime
from dateutil.parser import parse
import pandas as pd
import matplotlib.pyplot as plt 
% matplotlib inline 
import numpy as np
import logging

In [2]:
try:
    changed_directory
except NameError:
    os.chdir("..")
    changed_directory = True

In [3]:
hs = HyperStream(loglevel=logging.WARN)

S = hs.channel_manager.sphere
M = hs.channel_manager.memory
T = hs.channel_manager.tools
D = hs.channel_manager.mongo

house_plate = hs.plate_manager.plates["H"]
house_1 = (("house", "1"), )

## Here are the functions that will be used to calculate the arm angles and subsequently calculate inactivity periods

In [4]:
# Next, estimated arm angles were averaged per 5 second epoch, 
# and used to assess change in arm angle between successive 5 second epochs. 
# Periods of time during which there was no change larger than 5° over at least 
# 5 minutes were classified as bouts of sustained inactivity, or potential sleep periods. 

def arm_angle_average(data):
    data = list(data)
    if len(data) == 0:
        return None
    v = [dict(x=d[1][0], y=d[1][1], z=d[1][2]) for d in data]
    i = map(lambda d: d[0], data)
    df = pd.DataFrame(v, index=i)
    df.dropna(inplace=True)
    df['angle'] = 180 / np.pi * np.arctan(df['z'] / np.sqrt(np.square(df['x']) + np.square(df['y'])))
    return df['angle'].mean()


def inactivity_detector(data):
    data = list(data)
    if len(data) == 0:
        return None
    return max(abs(np.diff([d.value for d in data]))) < 5
    

## Defining some tools

In [5]:
# Tool to get sphere wearable data
sphere_tool         = hs.channel_manager.get_tool("sphere",         dict(modality="wearable"))

# Tool to extract acceleration data from wearable stream
component_xl1       = hs.channel_manager.get_tool("component",      dict(key="wearable-xl1"))

# 5 second sliding window tool
sliding_window_5    = hs.channel_manager.get_tool("sliding_window", dict(lower=-5, increment=5))

# Tool to apply arm angle average function inside sliding window
sliding_apply_arm   = hs.channel_manager.get_tool("sliding_apply",  dict(func=arm_angle_average))

# 300 second sliding window tool
sliding_window_300  = hs.channel_manager.get_tool("sliding_window", dict(lower=-300, increment=300))

# Tool to apply inactivity detector inside sliding window
sliding_apply_inact = hs.channel_manager.get_tool("sliding_apply",  dict(func=inactivity_detector))

## Defining some streams

In [6]:
# Stream for wearable data
wearable     = S.get_or_create_stream(StreamId("wearable", meta_data=house_1))

# Stream for wearable acceleration data
wearable_xl1 = M.get_or_create_stream(StreamId("wearable_xl1", meta_data=house_1))

# Stream for 5 second window
window_5     = M.get_or_create_stream(StreamId("window_5"))

# Stream for 300 second window
window_300   = M.get_or_create_stream(StreamId("window_300"))

# Stream for arm angle in 5 second windows
arm_angle    = M.get_or_create_stream(StreamId("arm_angle", meta_data=house_1))

# Stream for inactivity detecions in 300 second windows
inactivity   = M.get_or_create_stream(StreamId("inactivity", meta_data=house_1))


## Set a time interval for processing - using a small chunk of data for now

In [7]:
start = parse("2017-04-25T22:35:00Z")
end = start + timedelta(seconds=1200)

ti = TimeInterval(start, end)

## Use the SPHERE tool to pull the wearable data down, and then the component tool to select out the wearable data

In [8]:
sphere_tool.execute(
    source=None, 
    sinks=[wearable], 
    splitting_stream=None, 
    input_plate_value=None,
    output_plate=house_plate,
    interval=ti
)

print("Raw wearable data")
for timestamp, value in wearable.window(ti).head(5):
    print(value)
print("")
    
component_xl1.execute(
    sources=[wearable],
    sink=wearable_xl1,
    interval=ti,
    alignment_stream=None
)

print("Acceleration data")
for timestamp, value in wearable_xl1.window(ti).head(5):
    print(value)
print("")

2017-07-11 17:35:03,112 [WARNI]  KeyError: Modality monitoring missing from config
100%|██████████| 9287/9287 [00:02<00:00, 3782.46it/s]
2017-07-11 17:35:31,695 [WARNI]  Stream wearable: [house=1] not available for time interval (2017-04-25 22:35:00+00:00, 2017-04-25 22:55:00+00:00]. Perhaps upstream calculations haven't been performed


Raw wearable data


2017-07-11 17:35:36,043 [WARNI]  Stream wearable: [house=1] not available for time interval (2017-04-25 22:35:00+00:00, 2017-04-25 22:55:00+00:00]. Perhaps upstream calculations haven't been performed


{'wearable-rss': -75.0, 'uid': u'a0:e6:f8:00:00:c1', 'wearable-xl1': array([-0.672, -0.48 ,  0.48 ]), 'aid': u'fd00::212:4b00:0:4', 'wearable-mag-xl1': -0.044812060377644847}
{'wearable-rss': -63.0, 'uid': u'a0:e6:f8:00:00:c1', 'wearable-xl1': array([-0.672, -0.48 ,  0.448]), 'aid': u'fd00::212:4b00:0:5', 'wearable-mag-xl1': -0.060485231622195323}
{'aid': u'fd00::212:4b00:0:6', 'wearable-rss': -85.0, 'uid': u'a0:e6:f8:00:00:c1'}
{'uid': u'a0:e6:f8:00:00:c0', 'wearable-mag-xl1': -0.02308239856168004, 'wearable-xl1': array([-0.768,  0.512,  0.32 ])}
{'uid': u'a0:e6:f8:00:00:c0', 'wearable-mag-xl1': -0.039466814732567301, 'wearable-xl1': array([-0.768,  0.48 ,  0.32 ])}

Acceleration data
[-0.672 -0.48   0.48 ]
[-0.672 -0.48   0.448]
[-0.768  0.512  0.32 ]
[-0.768  0.48   0.32 ]
[-0.768  0.544  0.32 ]



## Purge the arm angle and inactivity streams (in case the function has changed since we last pulled the data)

In [9]:
M.data[arm_angle.stream_id] = StreamInstanceCollection()
arm_angle.calculated_intervals = None

M.data[inactivity.stream_id] = StreamInstanceCollection()
inactivity.calculated_intervals = None

## Compute the arm angle and inactivity predictions

In [10]:
sliding_window_5.execute(sources=None, sink=window_5, alignment_stream=None, interval=ti)
sliding_apply_arm.execute(
    sources=[window_5, wearable_xl1], 
    sink=arm_angle, 
    alignment_stream=None, 
    interval=ti)

print("Arm angle")
for timestamp, value in arm_angle.window(ti).head(5):
    print(value)
print("")
    
sliding_window_300.execute(sources=None, sink=window_300, alignment_stream=None, interval=ti)
sliding_apply_inact.execute(
    sources=[window_300, arm_angle], 
    sink=inactivity, 
    alignment_stream=None, 
    interval=ti)

print("Inactivity predictions")
for timestamp, value in inactivity.window(ti).head(5):
    print("{}    {}".format(timestamp, value))
print("")

Arm angle
26.0561148631
25.82200807
26.0375390115
24.664492562
26.14274809

Inactivity predictions
2017-04-25 22:40:00+00:00    False
2017-04-25 22:45:00+00:00    False
2017-04-25 22:50:00+00:00    True
2017-04-25 22:55:00+00:00    True



# Put this all together as a workflow

In [12]:
w = hs.create_workflow(
    workflow_id="sleep_detector",
    name="Sleep detector (van Hees)", 
    description="Sleep detection using the van Hees algorithm. See https://doi.org/10.1371/journal.pone.0142533.g001",
    owner="WP5",
    online=False,
    monitor=False
)

## First create the nodes: these correspond to the streams above. We'll place these in a dictionary for ease of use

In [13]:
from collections import namedtuple
NodeDef = namedtuple('NodeDef', ['channel', 'stream_name', 'plate_ids'], verbose=False)

nodes = (
    NodeDef(S, "wearable",     ["H"]),
    NodeDef(M, "wearable_xl1", ["H"]),
    NodeDef(M, "window_5",     []),
    NodeDef(M, "window_300",   []),
    NodeDef(M, "arm_angle",    ["H"]),
    NodeDef(M, "inactivity",   ["H"])
)

# Simple object to hold nodes
class NodeCollection(object): 
    pass

N = NodeCollection()

for n in nodes:
    setattr(N, n.stream_name, w.create_node(channel=n.channel, stream_name=n.stream_name, plate_ids=n.plate_ids))

## Next create the factors: these use the tools defined above

In [14]:
w.create_multi_output_factor(tool=sphere_tool, source=None, sink=N.wearable, splitting_node=None)
w.create_factor(tool=component_xl1, sources=[N.wearable], sink=N.wearable_xl1)
w.create_factor(tool=sliding_window_5, sources=[], sink=N.window_5)
w.create_factor(tool=sliding_window_300, sources=[], sink=N.window_300)
w.create_factor(tool=sliding_apply_arm, sources=[N.window_5, N.wearable_xl1], sink=N.arm_angle)
w.create_factor(tool=sliding_apply_inact, sources=[N.window_300, N.arm_angle], sink=N.inactivity)

Factor(alignment_node=None, plates=[Plate(meta_data_id=u'house', plate_id=u'H')], sink=Node(node_id='inactivity', plates=[Plate(meta_data_id=u'house', plate_id=u'H')]), sources=[Node(node_id='window_300', plates=[]), Node(node_id='arm_angle', plates=[Plate(meta_data_id=u'house', plate_id=u'H')])], tool=SlidingApply(func=<function inactivity_detector at 0x102afc7d0>))

## Finally execute the workflow and examine the results

In [15]:
w.execute(ti)
print("Inactivity predictions")
for timestamp, value in N.inactivity.streams[house_1].window(ti).head(5):
    print("{}    {}".format(timestamp, value))
print("")

Inactivity predictions
2017-04-25 22:40:00+00:00    False
2017-04-25 22:45:00+00:00    False
2017-04-25 22:50:00+00:00    True
2017-04-25 22:55:00+00:00    True

