In [29]:
from pyspark import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import ceil, unix_timestamp
from pyspark.sql.functions import rank
from pyspark.sql.functions import collect_list, array
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.sql.types import *

from scipy.interpolate import interpolate
from future.utils import lmap
from functools import partial
from scipy.signal import butter, filtfilt
import numpy as np
import pywt
import pandas as pd

In [30]:
spark = SparkSession.builder.appName("some_testing2").master("local").getOrCreate()

In [31]:
df = spark.read.format('com.databricks.spark.csv').option("header", "True").option("delimiter", ",")\
                      .load('C:/Users/awagner/Desktop/For_Tom/'+'AllLabData.csv')

In [34]:
df = df.withColumn("X", df["X"].cast("double"))
df = df.withColumn("Y", df["Y"].cast("double"))
df = df.withColumn("Z", df["Z"].cast("double"))
df = df.withColumn("TSStart", df["TSStart"].cast("timestamp"))
df = df.withColumn("TSEnd", df["TSEnd"].cast("timestamp"))
df = df.withColumn("interval_start", ((ceil(unix_timestamp(df["TSStart"]).cast("long")))%10**4)) 
df = df.withColumn("interval_end", ((ceil(unix_timestamp(df["TSEnd"]).cast("long")))%10**4)) 

In [36]:
schema_for_parse = ArrayType(FloatType(), False)

find_milisec = udf(lambda raw: (raw)[(raw.find('.')+1):(raw.find('.')+3)])
merge_integers = udf(lambda raw1, raw2: int(str(raw1) + str(raw2)))
parse = udf(lambda s: eval(str(s)), schema_for_parse)

In [37]:
df = df.withColumn("temp", find_milisec('TS')) 
df = df.withColumn("interval", (((unix_timestamp(df["TS"]).cast("long"))))) 
df = df.withColumn("interval", merge_integers('interval', 'temp'))

In [38]:
def give_my_key(*args):
    key = 0
    for i in args:
        key += float(i)
    return key

give_my_key_udf = udf(give_my_key)
    

In [39]:
df = df.withColumn("key", give_my_key_udf("interval_start", "interval_end", 'SubjectId') ) 
df = df.withColumn("key", df["key"].cast("double"))


In [40]:
print(df.dtypes)

[('_c0', 'string'), ('SessionId', 'string'), ('DeviceID', 'string'), ('TS', 'string'), ('X', 'double'), ('Y', 'double'), ('Z', 'double'), ('AnnotationStrValue', 'string'), ('BradykinesiaGA', 'string'), ('DyskinesiaGA', 'string'), ('TremorGA', 'string'), ('TSStart', 'timestamp'), ('TSEnd', 'timestamp'), ('SubjectId', 'string'), ('IntelUsername', 'string'), ('interval_start', 'bigint'), ('interval_end', 'bigint'), ('temp', 'string'), ('interval', 'string'), ('key', 'double')]


In [44]:
df.select(['key', 'X', 'Y', 'Z']).show(25 , False)

+------+----------------------+------------------+-------------------+
|key   |X                     |Y                 |Z                  |
+------+----------------------+------------------+-------------------+
|1741.0|0.032816266377255404  |0.9565589334411009|0.18538697489378717|
|1741.0|0.05317390375614024   |0.944447047479542 |0.18932435971451844|
|1741.0|0.0449467726174725    |0.9766106438670463|0.14992291459228008|
|1741.0|0.037274439836516335  |0.972530163064418 |0.1302699554976262 |
|1741.0|0.02876682562714425   |0.9846718973546182|0.14208155157964325|
|1741.0|0.06386329194215588   |1.0370491835109428|0.1301143953870914 |
|1741.0|0.02292142675033916   |1.0574879639064827|0.15766355101803806|
|1741.0|-0.02008564460674698  |1.0532871058391622|0.08293830677224615|
|1741.0|0.030053131108873775  |0.9884257934881129|0.04776625405385437|
|1741.0|-0.005639048934114398 |0.9562325486916411|0.0753371654395789 |
|1741.0|0.013946962127489297  |0.9803962072112112|0.06745177037089355|
|1741.

In [45]:
rdd_test = df.select('key','X', 'Y', 'Z', 'interval').rdd.map\
                 (lambda raw: (raw[0],([raw[1]], [raw[2]],  [raw[3]],  [raw[4]])))

In [46]:
rdd_test = rdd_test.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2], x[3] + y[3]))

In [47]:
df_raw= rdd_test.map(lambda row : (row[0] ,row[1][0], row[1][1], row[1][2], row[1][3])).\
                       toDF(['key', 'X', 'Y', 'Z', 'interval'])


In [48]:
df_raw.show(5)

+-------+--------------------+--------------------+--------------------+--------------------+
|    key|                   X|                   Y|                   Z|            interval|
+-------+--------------------+--------------------+--------------------+--------------------+
|12590.0|[0.28632198706050...|[0.82812405663445...|[-0.4823275881834...|[141596621950, 14...|
|13040.0|[0.92564838141756...|[-0.0091315938837...|[-0.3272873282107...|[141596643850, 14...|
|13450.0|[0.89605250785223...|[-0.3789722799210...|[0.09013530686785...|[141596664450, 14...|
|14670.0|[0.28530783043499...|[0.32870670526588...|[0.93734698900573...|[141596725550, 14...|
|20000.0|[0.02633892612881...|[-1.0006468761304...|[-0.3797875468897...|[141596991950, 14...|
+-------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [49]:
sort_vec = udf(lambda X, Y: [x for _,x in sorted(zip(Y,X))])

df_raw = df_raw.withColumn('X', sort_vec('X', 'interval'))
df_raw = df_raw.withColumn('Y', sort_vec('Y', 'interval'))
df_raw = df_raw.withColumn('Z', sort_vec('Z', 'interval'))

In [50]:
df_raw.dtypes

[('key', 'double'),
 ('X', 'string'),
 ('Y', 'string'),
 ('Z', 'string'),
 ('interval', 'array<string>')]

In [51]:
df_raw = df_raw.withColumn("X",  parse("X"))
df_raw = df_raw.withColumn("Y",  parse("Y"))
df_raw = df_raw.withColumn("Z",  parse("Z"))


In [53]:
def slidind_window(axis, time_stamp, slide, window_size, freq):
    #axis = eval(axis)
    t = time_stamp[0]
    windows = []
    windows.append(axis[:(window_size*freq+1)])
    for time1 in range(len(time_stamp)):
        if float(time_stamp[time1]) >= float(t) + 100*slide:
            if time1+window_size*freq < len(time_stamp):
                windows.append(axis[time1:(time1+window_size*freq+1)])
                t =  time_stamp[time1]
    
    return (windows)

sliding_window_partial = partial(slidind_window, slide = 2.5, window_size = 5, freq = 50)

In [54]:
schema_for_sliding= ArrayType(ArrayType(FloatType(), False), False)
sliding_window_udf = udf(sliding_window_partial, schema_for_sliding)


In [55]:
df_raw = df_raw.withColumn('X', sliding_window_udf('X', 'interval'))
df_raw = df_raw.withColumn('Y', sliding_window_udf('Y', 'interval'))
df_raw = df_raw.withColumn('Z', sliding_window_udf('Z', 'interval'))

In [56]:
df_raw.dtypes

[('key', 'double'),
 ('X', 'array<array<float>>'),
 ('Y', 'array<array<float>>'),
 ('Z', 'array<array<float>>'),
 ('interval', 'array<string>')]

In [58]:
A = df_raw.take(2)

In [71]:
df_flat = df_raw.rdd.map(lambda raw:  (raw[0] , list(zip(raw[1], raw[2], raw[3]))))
df_flat = df_flat.flatMapValues(lambda raw :raw)
df_flat = df_flat.map(lambda raw: (raw[0],raw[1][0],raw[1][1],raw[1][2])).toDF(['key', 'X', 'Y', 'Z'])


In [22]:
df_flat.show(25)

+-------+--------------------+--------------------+--------------------+
|    key|                   X|                   Y|                   Z|
+-------+--------------------+--------------------+--------------------+
|12590.0|[0.28632199764251...|[0.82812404632568...|[-0.4823275804519...|
|12590.0|[0.14670479297637...|[0.82017868757247...|[-0.5255244970321...|
|12590.0|[0.19157606363296...|[0.78271335363388...|[-0.7888002395629...|
|12590.0|[0.26038557291030...|[0.56054204702377...|[-0.7489655017852...|
|12590.0|[0.30886235833168...|[0.54022121429443...|[-0.7411012649536...|
|12590.0|[0.28801217675209...|[0.59268671274185...|[-0.7687094807624...|
|12590.0|[0.26382794976234...|[0.62085336446762...|[-0.8120114207267...|
|13040.0|[0.92564839124679...|[-0.0091315936297...|[-0.3272873163223...|
|13040.0|[0.98133587837219...|[0.25678467750549...|[-0.4418890178203...|
|13040.0|[1.32663416862487...|[0.09829099476337...|[-0.4729822278022...|
|13040.0|[0.85930812358856...|[0.02770161628723...|

In [72]:
def project_gravity(x, y, z, num_samples_per_interval=None, round_up_or_down='down', return_only_vertical=False):
    """
    Projection of 3D time signal to 2D
    
    Input:
        x (1D numpy) - time signal X samples
        y (1D numpy) - time signal Y samples
        z (1D numpy) - time signal Z samples
        num_samples_per_interval (integer) - cut the signal to num_samples_per_interval sub intervals
                                     and preform the 2D projection on the sub intervals
        round_up_or_down (string, down or up) - length(x)/num_samples_per_interval should 
                                                be ceil or floor
        return_only_vertical (boolean) - If True return only vertical axis
    
    Output:
        v (1D numpy) - vertical projection
        h (1D numpy) - horizontal projection
                                  
    """
    if num_samples_per_interval is None:
        v, h = project_gravity_xyz(x, y, z)
        if return_only_vertical:
            return v
        else:
            return v, h

    # set number of intervals
    n = len(x)/num_samples_per_interval
    if round_up_or_down == 'down':
        n = np.floor(n).astype(int)
        n = np.max([1, n])
    elif round_up_or_down == 'up':
        n = np.ceil(n).astype(int)

    # set window size
    win_size = np.floor(len(x)/n).astype(int)

    # perform sliding windows
    idx_start = 0
    v = []
    h = []

    # TODO Chunk the samples below evenly. Do this by dividing len(x) each time rather than the current implementation
    for i in range(n):
        idx_start = i * win_size
        idx_end = (i + 1) * win_size
        if i == n-1:  # last iteration
            idx_end = -1
        x_i = x[idx_start:idx_end]
        y_i = y[idx_start:idx_end]
        z_i = z[idx_start:idx_end]
        ver_i, hor_i = project_gravity_xyz(x_i, y_i, z_i)
        v.append(ver_i)
        h.append(hor_i)
    if return_only_vertical:
        return np.hstack(v)
    return np.hstack(v), np.hstack(h)


def project_gravity_xyz(x, y, z):
    xyz = np.stack((x, y, z), axis=1)
    return project_gravity_core(xyz)


def project_gravity_core(xyz):
    """
    Projection of data set to 2 dim
    
    Input:
        xyz (3d numpy array) - 0 dimension is number os samples, 1 dimension is length of signals
                    and 2 dim is number of axis
                    
    Output:
        ver (1d numpy) - Vertical projection
        hor (1d numpy) - Horizontal projection
        
    """
    ver = []
    hor = []
    
    # mean for each axis
    G = [np.mean(xyz[:, 0]), np.mean(xyz[:, 1]), np.mean(xyz[:, 2])]
    G_norm = G/np.sqrt(sum(np.power(G, 2)) + 0.0000001)
    
    # The projection is here
    for i in range(len(xyz[:, 0])):
        ver.append(float(np.dot([xyz[i, :]], G)))
        hor.append(float(np.sqrt(np.dot(xyz[i, :]-ver[i]*G_norm, xyz[i, :]-ver[i]*G_norm))))
        
    ver = np.reshape(np.asarray(ver), len(ver))
    return Vectors.dense(ver), Vectors.dense(hor)


In [73]:
schema_for_proj = StructType([
    StructField("proj_ver", VectorUDT(), False),
    StructField("proj_hor", VectorUDT(), False)
])

proj_func = udf(project_gravity_xyz, schema_for_proj)


df_proj = df_flat['X', 'Y', 'Z', 'key'].withColumn('proj', proj_func("X", "Y", "Z"))
df_proj = df_proj.select('key',
                 'proj.proj_ver', 
                 'proj.proj_hor')
df_proj.show(2)


+-------+--------------------+--------------------+
|    key|            proj_ver|            proj_hor|
+-------+--------------------+--------------------+
|12590.0|[0.96981579045316...|[0.17319772894329...|
|12590.0|[0.95745235257911...|[0.19960889960922...|
+-------+--------------------+--------------------+
only showing top 2 rows



In [74]:
def denoise(data):
    """
    Denoise the data with wavelet and
    Input:
        data - time signal
    Output:
        result - signal after denoising
    """
    data = data - np.mean(data) + 0.1
    WC = pywt.wavedec(data, 'sym8')
    threshold = 0.01*np.sqrt(2*np.log2(256))
    NWC = lmap(lambda x: pywt.threshold(x, threshold, 'soft'), WC)
    result = pywt.waverec(NWC, 'sym8')
    return  Vectors.dense(result)

schema_for_denoise_func = ArrayType(FloatType(), False)
denoise_func = udf(denoise,  VectorUDT())



df_denoise = df_proj['proj_ver','proj_hor', 'key'].withColumn('denoised_ver',
                    denoise_func("proj_ver")).withColumn('denoised_hor',denoise_func("proj_hor"))
df_denoise = df_denoise.select('key', "denoised_ver", "denoised_hor") 
df_denoise.show(2)

+-------+--------------------+--------------------+
|    key|        denoised_ver|        denoised_hor|
+-------+--------------------+--------------------+
|12590.0|[0.09713335271840...|[0.10704233282524...|
|12590.0|[0.07484846475425...|[0.16028882851580...|
+-------+--------------------+--------------------+
only showing top 2 rows



In [75]:
def toDWT(sig, rel = False):

        x = np.arange(0, len(sig))
        f = interpolate.interp1d(x, sig)
        xnew = np.arange(0, len(sig)-1, float(len(sig)-1)/2**np.ceil(np.log2(len(sig))))
        ynew = f(xnew)
        x = pywt.wavedec(ynew - np.mean(ynew), pywt.Wavelet('db1'), mode='smooth')
                
        J = len(x)
        res = np.zeros(J)
        for j in range(J):
            res[j] = float(np.sqrt(np.sum(x[j]**2)))
        if rel is True:
            res = res/np.sum(res + 10**(-10))
            res = (np.log(float(1)/(1-res)))
        
        final_res = []
        for not_kill in np.asarray(res):
            final_res.append(float(not_kill))
        return final_res

schema = StructType([
    StructField("F1", FloatType(), False),
    StructField("F2", FloatType(), False),
    StructField("F3", FloatType(), False),
    StructField("F4", FloatType(), False),
    StructField("F5", FloatType(), False),
    StructField("F6", FloatType(), False),
    StructField("F7", FloatType(), False),
    StructField("F8", FloatType(), False),
    StructField("F9", FloatType(), False)
])

toDWT_relative = partial(toDWT, rel = True)
toDWT_cont = partial(toDWT, rel = False)

toDWT_relative_udf = udf(toDWT_relative,  schema)
toDWT_cont_udf = udf(toDWT_cont,  schema)

df_features = df_denoise["denoised_ver", "denoised_hor", 'key'].withColumn('rel_features_ver', 
                        toDWT_relative_udf("denoised_ver")).withColumn('cont_features_ver',
                                          toDWT_cont_udf("denoised_ver"))

df_features = df_features["rel_features_ver", "cont_features_ver", "denoised_hor", 'key'].withColumn('rel_features_hor', 
                        toDWT_relative_udf("denoised_hor")).withColumn('cont_features_hor',
                                          toDWT_cont_udf("denoised_hor"))


df_features = df_features.select('key', 'rel_features_ver', 'cont_features_ver',
                                 'rel_features_hor', 'cont_features_hor')
df_features.show(2)

+-------+--------------------+--------------------+--------------------+--------------------+
|    key|    rel_features_ver|   cont_features_ver|    rel_features_hor|   cont_features_hor|
+-------+--------------------+--------------------+--------------------+--------------------+
|12590.0|[0.0,0.05599426,0...|[3.469447E-17,0.0...|[0.0,0.08517329,0...|[5.551115E-17,0.1...|
|12590.0|[2.220446E-16,0.0...|[7.8062556E-17,0....|[0.0,0.050331336,...|[9.020562E-17,0.0...|
+-------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [76]:
def to_array(col):
    def to_array_(v):
        return v
    return udf(to_array_, ArrayType(FloatType()))(col)

ready_for_model = (df_features
    .withColumn("rel_features_ver", to_array(col("rel_features_ver")))
    .withColumn("cont_features_ver", to_array(col("cont_features_ver")))
    .withColumn("rel_features_hor", to_array(col("rel_features_hor")))
    .withColumn("cont_features_hor", to_array(col("cont_features_hor")))          
    .select(["key"] + [col("rel_features_ver")[i] for i in range(9)] + 
            [col("cont_features_ver")[i] for i in range(9)] + 
            [col("rel_features_hor")[i] for i in range(9)] +
            [col("cont_features_hor")[i] for i in range(9)]))

In [28]:
ready_for_model.dtypes

[('key', 'double'),
 ('rel_features_ver[0]', 'float'),
 ('rel_features_ver[1]', 'float'),
 ('rel_features_ver[2]', 'float'),
 ('rel_features_ver[3]', 'float'),
 ('rel_features_ver[4]', 'float'),
 ('rel_features_ver[5]', 'float'),
 ('rel_features_ver[6]', 'float'),
 ('rel_features_ver[7]', 'float'),
 ('rel_features_ver[8]', 'float'),
 ('cont_features_ver[0]', 'float'),
 ('cont_features_ver[1]', 'float'),
 ('cont_features_ver[2]', 'float'),
 ('cont_features_ver[3]', 'float'),
 ('cont_features_ver[4]', 'float'),
 ('cont_features_ver[5]', 'float'),
 ('cont_features_ver[6]', 'float'),
 ('cont_features_ver[7]', 'float'),
 ('cont_features_ver[8]', 'float'),
 ('rel_features_hor[0]', 'float'),
 ('rel_features_hor[1]', 'float'),
 ('rel_features_hor[2]', 'float'),
 ('rel_features_hor[3]', 'float'),
 ('rel_features_hor[4]', 'float'),
 ('rel_features_hor[5]', 'float'),
 ('rel_features_hor[6]', 'float'),
 ('rel_features_hor[7]', 'float'),
 ('rel_features_hor[8]', 'float'),
 ('cont_features_hor[0]', 

In [33]:
df.dtypes

[('_c0', 'string'),
 ('SessionId', 'string'),
 ('DeviceID', 'string'),
 ('TS', 'string'),
 ('X', 'string'),
 ('Y', 'string'),
 ('Z', 'string'),
 ('AnnotationStrValue', 'string'),
 ('BradykinesiaGA', 'string'),
 ('DyskinesiaGA', 'string'),
 ('TremorGA', 'string'),
 ('TSStart', 'string'),
 ('TSEnd', 'string'),
 ('SubjectId', 'string'),
 ('IntelUsername', 'string')]

In [66]:
A = df_flat.take(2)

In [70]:
len(A[0][1][0][1])

3