In [1]:
# from cerebralcortex.util.helper_methods import get_study_names
# sn = get_study_names("/home/jupyter/cc3_conf/")
# print(sn)
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, FloatType, TimestampType, IntegerType
from pyspark.sql.functions import minute, second, mean, window
from pyspark.sql import functions as F
import numpy as np
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata, DataDescriptor,ModuleMetadata
from cerebralcortex import Kernel
import pandas as pd
study_name = 'rice'
CC = Kernel("/home/jupyter/cc3_conf/", study_name=study_name)

aclx_id = 'accelerometer_x--org.md2k.autosense--autosense_chest--chest'
acly_id = 'accelerometer_y--org.md2k.autosense--autosense_chest--chest'
aclz_id = 'accelerometer_z--org.md2k.autosense--autosense_chest--chest'

def adc_to_acceleration(vals):
    return np.float(((3*vals/4095) - 1.5) / 0.3)

transform_udf = F.udf(adc_to_acceleration, FloatType())


aclx = CC.get_stream(aclx_id).dropna()
win = F.window("timestamp", windowDuration='10 seconds', startTime='0 seconds',slideDuration='5 seconds')
groupbycols = ['user','version',win]


aclx = aclx._data.withColumn('x',transform_udf('x')).groupBy(groupbycols).agg({'x':'std'}).withColumnRenamed('stddev(x)','x')


acly = CC.get_stream(acly_id).dropna()

acly = acly._data.withColumn('y',transform_udf('y')).groupBy(groupbycols).agg({'y':'std'}).withColumnRenamed('stddev(y)','y').drop('version')



aclz = CC.get_stream(aclz_id).dropna()

aclz = aclz._data.withColumn('y',transform_udf('y')).groupBy(groupbycols).agg({'y':'std'}).withColumnRenamed('stddev(y)','z').drop('version')




aclxyz_std = aclx.join(acly,['user','window']).join(aclz,['user','window'])


stream_name = 'org.md2k.autosense.accel.std.10seconds'
stream_metadata = Metadata()
stream_metadata.set_name(stream_name).set_description("Chest Accelerometer standard deviation across all channels - Autosense") \
    .add_dataDescriptor(
    DataDescriptor().set_name("x").set_type("double").set_attribute("description", "accelerometer x axis std")) \
    .add_dataDescriptor(
    DataDescriptor().set_name("y").set_type("double").set_attribute("description", "accelerometer y axis std")) \
    .add_dataDescriptor(
    DataDescriptor().set_name("z").set_type("double").set_attribute("description", "accelerometer z axis std")) \
    .add_dataDescriptor(
    DataDescriptor().set_name("window").set_type("struct").set_attribute("description", \
    "window start and end time in UTC").set_attribute('start', \
    'start of 1 minute window').set_attribute('end','end of 1 minute window')) \
    .add_module(
    ModuleMetadata().set_name("fourtytwo/mullah/cc3/Untitled.ipynb").set_attribute("attribute_key", "attribute_value").set_author(
        "Md Azim Ullah", "mullah@memphis.edu"))
stream_metadata.is_valid()
data = aclxyz_std
ds = DataStream(data=data,metadata=stream_metadata)
# ds.printSchema()
CC.save_stream(ds,overwrite=True)

True

In [2]:
def adc_to_acceleration(x,y,z):
    mag = np.sqrt(np.sum([np.square(x),np.square(y),np.square(z)]))
    if mag>.21:
        return 1
    return 0

transform_udf = F.udf(adc_to_acceleration, IntegerType())

stream_id = 'org.md2k.autosense.accel.std.10seconds'
aclx = CC.get_stream(stream_id)
acl_activity = aclx.withColumn('activity',transform_udf('x','y','z')).drop(*['x','y','z'])

stream_name = 'org.md2k.autosense.accel.activity.10seconds'
stream_metadata = Metadata()
stream_metadata.set_name(stream_name).set_description("Chest Accelerometer Activity in 10 second windows") \
    .add_dataDescriptor(
    DataDescriptor().set_name("activity").set_type("integer").set_attribute("description", \
    "accelerometer activity").set_attribute('0','Stationery').set_attribute('1','Non Stationery')) \
    .add_module(
    ModuleMetadata().set_name("fourtytwo/mullah/cc3/10_seconds_activity.ipynb").set_attribute("attribute_key", "attribute_value").set_author(
        "Md Azim Ullah", "mullah@memphis.edu"))
data = acl_activity._data
ds = DataStream(data=data,metadata=stream_metadata)
CC.save_stream(ds,overwrite=True)

True

In [3]:
CC.get_stream('org.md2k.autosense.accel.activity.10seconds').count()

19910560

In [6]:
import pandas as pd
stream_id = 'org.md2k.autosense.accel.activity.10seconds'
aclx = CC.get_stream(stream_id)
aclx = aclx.withColumn('timestamp',F.col('window').start).drop('window')
win = F.window("timestamp", windowDuration='60 seconds', startTime='0 seconds',slideDuration='5 seconds')
groupbycols = ['user','version',win]
aclx = aclx._data.groupBy(groupbycols).agg({'activity':'avg'}).withColumnRenamed('avg(activity)','activity')
acl_activity = aclx.withColumn('activity',F.when(F.col('activity')>.5,1).otherwise(0))
acl_activity = acl_activity.withColumn('activity',F.col('activity').cast('integer'))
stream_name = 'org.md2k.autosense.accel.activity.60seconds'
stream_metadata = Metadata()
stream_metadata.set_name(stream_name).set_description("Chest Accelerometer Activity in 60 second windows") \
    .add_dataDescriptor(
    DataDescriptor().set_name("activity").set_type("integer").set_attribute("description", \
    "accelerometer activity").set_attribute('0','Stationery').set_attribute('1','Non Stationery')) \
    .add_dataDescriptor(
    DataDescriptor().set_name("window").set_type("struct").set_attribute("description", \
    "window start and end time in UTC").set_attribute('start', \
    'start of 1 minute window').set_attribute('end','end of 1 minute window')) \
    .add_module(
    ModuleMetadata().set_name("fourtytwo/mullah/cc3/60_seconds_activity.ipynb").set_attribute("attribute_key", "attribute_value").set_author(
        "Md Azim Ullah", "mullah@memphis.edu"))
data = acl_activity
ds = DataStream(data=data,metadata=stream_metadata)
CC.save_stream(ds,overwrite=True)

True

In [7]:
data  = CC.get_stream('org.md2k.autosense.accel.activity.60seconds')
# acl_activity.select('activity').distinct()

In [8]:
data.count()

20534162

In [10]:
CC.study_name

'rice'