In [5]:
import tensorflow as tf
from tensorflow.keras.models import load_model
import numpy as np
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, DoubleType,MapType, StringType,ArrayType, FloatType, TimestampType, IntegerType
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata, DataDescriptor, \
ModuleMetadata
from scipy import signal
from scipy.interpolate import interp1d
from cerebralcortex import Kernel
from scipy.stats import skew,kurtosis,mode
CC = Kernel("/home/jupyter/cc3_conf/", study_name='mperf')

  self.fs = pa.hdfs.connect(self.hdfs_ip, self.hdfs_port)


Resampling mperf data to 20Hz and computes the magnitude and applies activity recognition model

In [6]:
import tempfile
import tensorflow

def interpolate_acl(a,window_size=10,fs_now=25,fs_new=20):
    x_now = np.linspace(0,window_size,window_size*fs_now)
    f = interp1d(x_now,a,axis=0,fill_value='extrapolate')
    x_new = np.linspace(0,window_size,window_size*fs_new)
    return f(x_new)

class ModelWrapperPickable:

    def __init__(self, model):
        self.model = model

    def __getstate__(self):
        model_str = ''
        with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
            tensorflow.keras.models.save_model(self.model, fd.name, overwrite=True)
            model_str = fd.read()
        d = { 'model_str': model_str }
        return d

    def __setstate__(self, state):
        with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
            fd.write(state['model_str'])
            fd.flush()
            self.model = tensorflow.keras.models.load_model(fd.name)

            
def compute_magnitude_and_activity(data,
                                   Fs = 25,
                                   window_size = 10,
                                   stream_name = 'org.md2k.feature.motionsensehrv.decoded.rightwrist.all',
                                   new_Fs = 20,
                                   filename1 = './models/activity_estimator_both_magnitude.hdf5',
                                   filename2 = './models/activity_estimator_wisdm_magnitude.hdf5'):
            
    model = load_model(filename1)

    model_wrapper= ModelWrapperPickable(model)


    model_wisdm = load_model(filename2)

    model_wrapper_wisdm= ModelWrapperPickable(model_wisdm)


    data = data.select('localtime','timestamp','aclx','acly','aclz','user','version')

    data = data.withColumn('magnitude',F.sqrt(F.pow(F.col('aclx'),2)+F.pow(F.col('acly'),2)+F.pow(F.col('aclz'),2))).drop('aclx','acly','aclz')

    data = data.withColumn('time',F.col('timestamp').cast('double'))

    data = data.withColumn('magnitude_time',F.array('time','magnitude')).drop('time','magnitude')

    groupbycols = ['user','version',F.window('timestamp',windowDuration=str(window_size)+' seconds', startTime='0 seconds')]
    data_windowed_10 = data.groupBy(groupbycols).agg(F.collect_list('magnitude_time'),F.collect_list('localtime'))

    data_windowed_10 = data_windowed_10.withColumnRenamed('collect_list(magnitude_time)','magnitude_time')
    data_windowed_10 = data_windowed_10.withColumnRenamed('collect_list(localtime)','localtime')

    data_windowed_10 = data_windowed_10.withColumn('localtime',F.col('localtime').getItem(0))

    data_windowed_10 = data_windowed_10.withColumn('length',F.size('magnitude_time'))

    data_windowed_10 = data_windowed_10.filter(F.col('length')==window_size*Fs).drop('length')

    data_windowed_10 = data_windowed_10.withColumn('timestamp',F.col('window').start).withColumn('start',F.col('window').start).withColumn('end',F.col('window').end).drop('window')

    data_windowed_10  = data_windowed_10.withColumn('day',F.date_format('localtime',"yyyyMMdd"))

    final_activity_list = ['Brushing','Cycling','Sports','Eating','Driving',
                           'Exercise','Sitting','Stairs','Standing','Walking']
    final_activity_dict = {a:i for i,a in enumerate(final_activity_list)}
    final_activity_dict_reverse = {i:a for i,a in enumerate(final_activity_list)}

    activity_list_wisdm = ['Sitting','Stairs','Jogging','Typing','Standing','Walking','Brushing','Eating','Drinking','Kicking',
                    'Playing Catch','Dribbling','Writing','Clapping','Folding Clothes']
    schema = StructType(list(data_windowed_10.schema)[:2]+list(data_windowed_10.schema)[3:]+[StructField("magnitude", ArrayType(DoubleType())),
                                                                                            StructField("prediction", StringType()),
                                                                                            StructField("prediction_wisdm", StringType()),
                                                                                            StructField("mean", DoubleType()),
                                                                                            StructField("std", DoubleType()),
                                                                                            StructField("skew", DoubleType()),
                                                                                            StructField("kurtosis", DoubleType())])

    def smooth_predictions(df):
        if df.shape[0]<3:
            return pd.DataFrame([],columns=columns)
        for name in ['prediction','prediction_wisdm']:
            y_activities = list(df[name])
            m_act = mode(y_activities)[0][0]
            df[name] = [m_act]*df.shape[0]
        return df

    columns = [a.name for a in schema.fields]
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def interpolate_acl_magnitude(df):
        df['magnitude_time'] = df['magnitude_time'].apply(lambda b:np.array([np.array(a) for a in b]).reshape(window_size*Fs,2))
        df['magnitude_time'] = df['magnitude_time'].apply(lambda a:a[a[:,0].argsort()])
        df['magnitude_time'] = df['magnitude_time'].apply(lambda a:a[:,1].reshape(window_size*Fs,1))
        df['magnitude'] = df['magnitude_time'].apply(lambda a:interpolate_acl(a).reshape(1,window_size*new_Fs,1))
        X = np.concatenate(list(df['magnitude']))
        y_pred = model_wrapper.model.predict(X).argmax(axis=1)
        df['prediction'] = list(y_pred.reshape(-1))
        df['prediction'] = df['prediction'].apply(lambda a:final_activity_dict_reverse[a])

        y_pred = model_wrapper_wisdm.model.predict(X).argmax(axis=1)
        df['prediction_wisdm'] = list(y_pred.reshape(-1))
        df['prediction_wisdm'] = df['prediction_wisdm'].apply(lambda a:activity_list_wisdm[a])   
        df['magnitude'] = df['magnitude'].apply(lambda a:a.reshape(-1)) 
        df['mean'] = df['magnitude'].apply(lambda a:np.mean(a))
        df['std'] = df['magnitude'].apply(lambda a:np.std(a))
        df['skew'] = df['magnitude'].apply(lambda a:skew(a))
        df['kurtosis'] = df['magnitude'].apply(lambda a:kurtosis(a))
        df = df[columns]
        df = df.groupby(pd.Grouper(key='timestamp',freq=str(6*window_size)+'S'),as_index=False).apply(smooth_predictions)
        return df

    data_interpolated = data_windowed_10.groupBy(['user','version','day']).apply(interpolate_acl_magnitude)
    schema = data_interpolated.schema
    stream_metadata = Metadata()
    stream_metadata.set_name(stream_name+'.activity').set_description("Activity Computed")
    for field in schema.fields:
        stream_metadata.add_dataDescriptor(
            DataDescriptor().set_name(str(field.name)).set_type(str(field.dataType))
        )
    stream_metadata.add_module(
        ModuleMetadata().set_name("activity datastream") \
        .set_attribute("url", "https://md2k.org").set_author(
            "Md Azim Ullah", "mullah@memphis.edu"))
    stream_metadata.is_valid()
    data_interpolated.printSchema()
    ds = DataStream(data=data_interpolated,metadata=stream_metadata)
    return ds
# CC.save_stream(ds,overwrite=True)

In [None]:
stream_name = 'org.md2k.feature.motionsensehrv.decoded.rightwrist.all'
data = CC.get_stream(stream_name)
data_activity = compute_magnitude_and_activity(data)
CC.save_stream(data_activity,overwrite=True)



root
 |-- user: string (nullable = true)
 |-- version: integer (nullable = false)
 |-- localtime: timestamp (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- day: string (nullable = true)
 |-- magnitude: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- prediction: string (nullable = true)
 |-- prediction_wisdm: string (nullable = true)
 |-- mean: double (nullable = true)
 |-- std: double (nullable = true)
 |-- skew: double (nullable = true)
 |-- kurtosis: double (nullable = true)



In [4]:
data_activity.metadata

{
    "annotations": [],
    "data_descriptor": [
        {
            "attributes": {},
            "name": "user",
            "type": "StringType"
        },
        {
            "attributes": {},
            "name": "version",
            "type": "IntegerType"
        },
        {
            "attributes": {},
            "name": "localtime",
            "type": "TimestampType"
        },
        {
            "attributes": {},
            "name": "timestamp",
            "type": "TimestampType"
        },
        {
            "attributes": {},
            "name": "start",
            "type": "TimestampType"
        },
        {
            "attributes": {},
            "name": "end",
            "type": "TimestampType"
        },
        {
            "attributes": {},
            "name": "day",
            "type": "StringType"
        },
        {
            "attributes": {},
            "name": "magnitude",
            "type": "ArrayType(DoubleType,true)"
        },
        