In [1]:
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
import seaborn as sns
import numpy as np
from sklearn import metrics
from sklearn.metrics import classification_report, confusion_matrix
from matplotlib import pyplot as plt
import keras
from keras import initializers
import tensorflow as tf
from keras import backend as K
from keras.backend import *
from keras.models import Sequential, Model, load_model
from keras.layers import Dense, Layer, Lambda, Input, Flatten, Dropout, Conv2D, MaxPooling2D, Conv1D, MaxPooling1D, LSTM, TimeDistributed, ConvLSTM2D, Permute, Reshape, Conv2D
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import math_ops
from keras.callbacks import ModelCheckpoint, LearningRateScheduler, EarlyStopping

def own_batch_dot(x, y, axes=None):
	"""Batchwise dot product.
	`batch_dot` is used to compute dot product of `x` and `y` when
	`x` and `y` are data in batch, i.e. in a shape of
	`(batch_size, :)`.
	`batch_dot` results in a tensor or variable with less dimensions
	than the input. If the number of dimensions is reduced to 1,
	we use `expand_dims` to make sure that ndim is at least 2.
	Arguments:
		x: Keras tensor or variable with `ndim >= 2`.
		y: Keras tensor or variable with `ndim >= 2`.
		axes: list of (or single) int with target dimensions.
			The lengths of `axes[0]` and `axes[1]` should be the same.
	Returns:
		A tensor with shape equal to the concatenation of `x`'s shape
		(less the dimension that was summed over) and `y`'s shape
		(less the batch dimension and the dimension that was summed over).
		If the final rank is 1, we reshape it to `(batch_size, 1)`.
	Examples:
		Assume `x = [[1, 2], [3, 4]]` and `y = [[5, 6], [7, 8]]`
		`batch_dot(x, y, axes=1) = [[17, 53]]` which is the main diagonal
		of `x.dot(y.T)`, although we never have to calculate the off-diagonal
		elements.
		Shape inference:
		Let `x`'s shape be `(100, 20)` and `y`'s shape be `(100, 30, 20)`.
		If `axes` is (1, 2), to find the output shape of resultant tensor,
			loop through each dimension in `x`'s shape and `y`'s shape:
		* `x.shape[0]` : 100 : append to output shape
		* `x.shape[1]` : 20 : do not append to output shape,
			dimension 1 of `x` has been summed over. (`dot_axes[0]` = 1)
		* `y.shape[0]` : 100 : do not append to output shape,
			always ignore first dimension of `y`
		* `y.shape[1]` : 30 : append to output shape
		* `y.shape[2]` : 20 : do not append to output shape,
			dimension 2 of `y` has been summed over. (`dot_axes[1]` = 2)
		`output_shape` = `(100, 30)`
	```python
		>>> x_batch = K.ones(shape=(32, 20, 1))
		>>> y_batch = K.ones(shape=(32, 30, 20))
		>>> xy_batch_dot = K.batch_dot(x_batch, y_batch, axes=[1, 2])
		>>> K.int_shape(xy_batch_dot)
		(32, 1, 30)
	```
	"""
	if isinstance(axes, int):
		axes = (axes, axes)
	x_ndim = ndim(x)
	y_ndim = ndim(y)
	if axes is None:
		# behaves like tf.batch_matmul as default
		axes = [x_ndim - 1, y_ndim - 2]
	if x_ndim > y_ndim:
		diff = x_ndim - y_ndim
		y = array_ops.reshape(y,
							array_ops.concat(
								[array_ops.shape(y), [1] * (diff)], axis=0))
	elif y_ndim > x_ndim:
		diff = y_ndim - x_ndim
		x = array_ops.reshape(x,
							array_ops.concat(
								[array_ops.shape(x), [1] * (diff)], axis=0))
	else:
		diff = 0
	if ndim(x) == 2 and ndim(y) == 2:
		if axes[0] == axes[1]:
			out = math_ops.reduce_sum(math_ops.multiply(x, y), axes[0])
		else:
			out = math_ops.reduce_sum(
			math_ops.multiply(array_ops.transpose(x, [1, 0]), y), axes[1])
	else:
		adj_x = None if axes[0] == ndim(x) - 1 else True
		adj_y = True if axes[1] == ndim(y) - 1 else None
		out = math_ops.matmul(x, y, adjoint_a=adj_x, adjoint_b=adj_y)
	if diff:
		if x_ndim > y_ndim:
			idx = x_ndim + y_ndim - 3
		else:
			idx = x_ndim - 1
			out = array_ops.squeeze(out, list(range(idx, idx + diff)))
	if ndim(out) == 1:
		out = expand_dims(out, 1)
	return out

randomSeed = 1

class DropPath(layers.Layer):
    def __init__(self, drop_prob=0.0, **kwargs):
        super(DropPath, self).__init__(**kwargs)
        self.drop_prob = drop_prob

    def call(self, x,training=None):
        if(training):
            input_shape = tf.shape(x)
            batch_size = input_shape[0]
            rank = x.shape.rank
            shape = (batch_size,) + (1,) * (rank - 1)
            random_tensor = (1 - self.drop_prob) + tf.random.uniform(shape, dtype=x.dtype)
            path_mask = tf.floor(random_tensor)
            output = tf.math.divide(x, 1 - self.drop_prob) * path_mask
            return output
        else:
            return x

    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'drop_prob': self.drop_prob,})
        return config

class GatedLinearUnit(layers.Layer):
    def __init__(self,units,**kwargs):
        super(GatedLinearUnit, self).__init__(**kwargs)
        self.units = units
        self.linear = layers.Dense(units * 2)
        self.sigmoid = tf.keras.activations.sigmoid
    def call(self, inputs):
        linearProjection = self.linear(inputs)
        softMaxProjection = self.sigmoid(linearProjection[:,:,self.units:])
        return tf.multiply(linearProjection[:,:,:self.units],softMaxProjection)

    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'units': self.units,})
        return config

class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim,**kwargs):
        super(PatchEncoder, self).__init__(**kwargs)
        self.num_patches = num_patches
        self.projection_dim = projection_dim
        self.position_embedding = layers.Embedding(input_dim=num_patches, output_dim=projection_dim)
    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = patch + self.position_embedding(positions)
        return encoded
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'num_patches': self.num_patches,
            'projection_dim': self.projection_dim,})
        return config

class ClassToken(layers.Layer):
    def __init__(self, hidden_size,**kwargs):
        super(ClassToken, self).__init__(**kwargs)
        self.cls_init = tf.random.normal
        self.hidden_size = hidden_size
        self.cls = tf.Variable(
            name="cls",
            initial_value=self.cls_init(shape=(1, 1, self.hidden_size), seed=randomSeed, dtype="float32"),
            trainable=True,
        )

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        cls_broadcasted = tf.cast(
            tf.broadcast_to(self.cls, [batch_size, 1, self.hidden_size]),
            dtype=inputs.dtype,
        )
        return tf.concat([cls_broadcasted, inputs], 1)
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'hidden_size': self.hidden_size,})
        return config

class Prompts(layers.Layer):
    def __init__(self, projectionDims,promptCount = 1,**kwargs):
        super(Prompts, self).__init__(**kwargs)
        self.cls_init = tf.random.normal
        self.projectionDims = projectionDims
        self.promptCount = promptCount
        self.prompts = [tf.Variable(
            name="prompt"+str(_),
            initial_value=self.cls_init(shape=(1, 1, self.projectionDims), seed=randomSeed, dtype="float32"),
            trainable=True,
        )  for _ in range(promptCount)]

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        prompt_broadcasted = tf.concat([tf.cast(tf.broadcast_to(promptInits, [batch_size, 1, self.projectionDims]),dtype=inputs.dtype,)for promptInits in self.prompts],1)
        return tf.concat([inputs,prompt_broadcasted], 1)

    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'projectionDims': self.projectionDims,
            'promptCount': self.promptCount,})
        return config

class SensorWiseMHA(layers.Layer):
    def __init__(self, projectionQuarter, num_heads,startIndex,stopIndex,dropout_rate = 0.0,dropPathRate = 0.0, **kwargs):
        super(SensorWiseMHA, self).__init__(**kwargs)
        self.projectionQuarter = projectionQuarter
        self.num_heads = num_heads
        self.dropout_rate = dropout_rate
        self.MHA = layers.MultiHeadAttention(num_heads=self.num_heads, key_dim=self.projectionQuarter, dropout = dropout_rate )
        self.startIndex = startIndex
        self.stopIndex = stopIndex
        self.dropPathRate = dropPathRate
        self.DropPath = DropPath(dropPathRate)
    def call(self, inputData, training=None, return_attention_scores = False):
        extractedInput = inputData[:,:,self.startIndex:self.stopIndex]
        if(return_attention_scores):
            MHA_Outputs, attentionScores = self.MHA(extractedInput,extractedInput,return_attention_scores = True )
            return MHA_Outputs , attentionScores
        else:
            MHA_Outputs = self.MHA(extractedInput,extractedInput)
            MHA_Outputs = self.DropPath(MHA_Outputs)
            return MHA_Outputs

    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'projectionQuarter': self.projectionQuarter,
            'num_heads': self.num_heads,
            'startIndex': self.startIndex,
            'dropout_rate': self.dropout_rate,
            'stopIndex': self.stopIndex,
            'dropPathRate': self.dropPathRate,})
        return config
def softDepthConv(inputs):
    kernel = inputs[0]
    inputData = inputs[1]
    convOutputs = tf.nn.conv1d(
    inputData,
    kernel,
    stride = 1,
    padding = 'SAME',
    data_format='NCW',)
    return convOutputs




class liteFormer(layers.Layer):
    def __init__(self,startIndex,stopIndex, projectionSize, kernelSize = 16, attentionHead = 3, use_bias=False, dropPathRate = 0.0,dropout_rate = 0,**kwargs):
        super(liteFormer, self).__init__(**kwargs)
        self.use_bias = use_bias
        self.startIndex = startIndex
        self.stopIndex = stopIndex
        self.kernelSize = kernelSize
        self.softmax = tf.nn.softmax
        self.projectionSize = projectionSize
        self.attentionHead = attentionHead
        self.DropPathLayer = DropPath(dropPathRate)
        self.projectionHalf = projectionSize // 2
    def build(self,inputShape):
        self.depthwise_kernel = [self.add_weight(
            shape=(self.kernelSize,1,1),
            initializer="glorot_uniform",
            trainable=True,
            name="convWeights"+str(_),
            dtype="float32") for _ in range(self.attentionHead)]
        if self.use_bias:
            self.convBias = self.add_weight(
                shape=(self.attentionHead,),
                initializer="glorot_uniform",
                trainable=True,
                name="biasWeights",
                dtype="float32"
            )

    def call(self, inputs,training=None):
        formattedInputs = inputs[:,:,self.startIndex:self.stopIndex]
        inputShape = tf.shape(formattedInputs)
        reshapedInputs = tf.reshape(formattedInputs,(-1,self.attentionHead,inputShape[1]))
        if(training):
            for convIndex in range(self.attentionHead):
                self.depthwise_kernel[convIndex].assign(self.softmax(self.depthwise_kernel[convIndex], axis=0))
        convOutputs = tf.convert_to_tensor([tf.nn.conv1d(
            reshapedInputs[:,convIndex:convIndex+1,:],
            self.depthwise_kernel[convIndex],
            stride = 1,
            padding = 'SAME',
            data_format='NCW',) for convIndex in range(self.attentionHead) ])
        convOutputsDropPath = self.DropPathLayer(convOutputs)
        localAttention = tf.reshape(convOutputsDropPath,(-1,inputShape[1],self.projectionSize))
        return localAttention
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'use_bias': self.use_bias,
            'kernelSize': self.kernelSize,
            'startIndex': self.startIndex,
            'stopIndex': self.stopIndex,
            'projectionSize': self.projectionSize,
            'attentionHead': self.attentionHead,})
        return config

class mixAccGyro(layers.Layer):
    def __init__(self,projectionQuarter,projectionHalf,projection_dim,**kwargs):
        super(mixAccGyro, self).__init__(**kwargs)
        self.projectionQuarter = projectionQuarter
        self.projectionHalf = projectionHalf
        self.projection_dim = projection_dim
        self.projectionThreeFourth = self.projectionHalf+self.projectionQuarter
        self.mixedAccGyroIndex = tf.reshape(tf.transpose(tf.stack(
            [np.arange(projectionQuarter,projectionHalf), np.arange(projectionHalf,projectionHalf + projectionQuarter)])),[-1])
        self.newArrangement = tf.concat((np.arange(0,projectionQuarter),self.mixedAccGyroIndex,np.arange(self.projectionThreeFourth,projection_dim)),axis = 0)
    def call(self, inputs):
        return tf.gather(inputs,self.newArrangement,axis= 2)

    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'projectionQuarter': self.projectionQuarter,
            'projectionHalf': self.projectionHalf,
            'projection_dim': self.projection_dim,
        })
        return config

def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.swish)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

def mlp2(x, hidden_units, dropout_rate):
    x = layers.Dense(hidden_units[0],activation=tf.nn.swish)(x)
    x = layers.Dropout(dropout_rate)(x)
    x = layers.Dense(hidden_units[1])(x)
    return x

def depthMLP(x, hidden_units, dropout_rate):
    x = layers.Dense(hidden_units[0])(x)
    x = layers.DepthwiseConv1D(3,data_format='channels_first',activation=tf.nn.swish)(x)
    x = layers.Dropout(dropout_rate)(x)
    x = layers.Dense(hidden_units[1])(x)
    x = layers.Dropout(dropout_rate)(x)
    return x

class SensorPatchesTimeDistributed(layers.Layer):
    def __init__(self, projection_dim,filterCount,patchCount,frameSize = 128, channelsCount = 6,**kwargs):
        super(SensorPatchesTimeDistributed, self).__init__(**kwargs)
        self.projection_dim = projection_dim
        self.frameSize = frameSize
        self.channelsCount = channelsCount
        self.patchCount = patchCount
        self.filterCount = filterCount
        self.reshapeInputs = layers.Reshape((patchCount, frameSize // patchCount, channelsCount))
        self.kernelSize = (projection_dim//2 + filterCount) // filterCount
        self.accProjection = layers.TimeDistributed(layers.Conv1D(filters = filterCount,kernel_size = self.kernelSize,strides = 1, data_format = "channels_last"))
        self.gyroProjection = layers.TimeDistributed(layers.Conv1D(filters = filterCount,kernel_size = self.kernelSize,strides = 1, data_format = "channels_last"))
        self.flattenTime = layers.TimeDistributed(layers.Flatten())
        assert (projection_dim//2 + filterCount) / filterCount % self.kernelSize == 0
        print("Kernel Size is "+str((projection_dim//2 + filterCount) / filterCount))
#         assert
    def call(self, inputData):
        inputData = self.reshapeInputs(inputData)
        accProjections = self.flattenTime(self.accProjection(inputData[:,:,:,:3]))
        gyroProjections = self.flattenTime(self.gyroProjection(inputData[:,:,:,3:]))
        Projections = tf.concat((accProjections,gyroProjections),axis=2)
        return Projections
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'projection_dim': self.projection_dim,
            'filterCount': self.filterCount,
            'patchCount': self.patchCount,
            'frameSize': self.frameSize,
            'channelsCount': self.channelsCount,})
        return config

class SensorPatches(layers.Layer):
    def __init__(self, projection_dim, patchSize,timeStep, **kwargs):
        super(SensorPatches, self).__init__(**kwargs)
        self.patchSize = patchSize
        self.timeStep = timeStep
        self.projection_dim = projection_dim
        self.accProjection = layers.Conv1D(filters = int(projection_dim/3),kernel_size = patchSize,strides = timeStep, data_format = "channels_last")
        self.gyroProjection = layers.Conv1D(filters = int(projection_dim/3),kernel_size = patchSize,strides = timeStep, data_format = "channels_last")
        self.totalaccProjection = layers.Conv1D(filters = int(projection_dim/3),kernel_size = patchSize,strides = timeStep, data_format = "channels_last")
    def call(self, inputData):

        accProjections = self.accProjection(inputData[:,:,:3])
        gyroProjections = self.gyroProjection(inputData[:,:,3:6])
        totalaccProjection = self.totalaccProjection(inputData[:,:,6:9])
        Projections = tf.concat((accProjections,gyroProjections,totalaccProjection),axis=2)
        return Projections
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'patchSize': self.patchSize,
            'projection_dim': self.projection_dim,
            'timeStep': self.timeStep,})
        return config


class threeSensorPatches(layers.Layer):
    def __init__(self, projection_dim, patchSize,timeStep, **kwargs):
        super(threeSensorPatches, self).__init__(**kwargs)
        self.patchSize = patchSize
        self.timeStep = timeStep
        self.projection_dim = projection_dim
        self.accProjection = layers.Conv1D(filters = int(projection_dim//3),kernel_size = patchSize,strides = timeStep, data_format = "channels_last")
        self.gyroProjection = layers.Conv1D(filters = int(projection_dim//3),kernel_size = patchSize,strides = timeStep, data_format = "channels_last")
        self.magProjection = layers.Conv1D(filters = int(projection_dim//3),kernel_size = patchSize,strides = timeStep, data_format = "channels_last")

    def call(self, inputData):

        accProjections = self.accProjection(inputData[:,:,:3])
        gyroProjections = self.gyroProjection(inputData[:,:,3:6])
        magProjections = self.magProjection(inputData[:,:,6:])

        Projections = tf.concat((accProjections,gyroProjections,magProjections),axis=2)
        return Projections
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'patchSize': self.patchSize,
            'projection_dim': self.projection_dim,
            'timeStep': self.timeStep,})
        return config


class fourSensorPatches(layers.Layer):
    def __init__(self, projection_dim, patchSize,timeStep, **kwargs):
        super(fourSensorPatches, self).__init__(**kwargs)
        self.patchSize = patchSize
        self.timeStep = timeStep
        self.projection_dim = projection_dim
        self.accProjection = layers.Conv1D(filters = int(projection_dim/4),kernel_size = patchSize,strides = timeStep, data_format = "channels_last")
        self.gyroProjection = layers.Conv1D(filters = int(projection_dim/4),kernel_size = patchSize,strides = timeStep, data_format = "channels_last")
        self.magProjection = layers.Conv1D(filters = int(projection_dim/4),kernel_size = patchSize,strides = timeStep, data_format = "channels_last")
        self.altProjection = layers.Conv1D(filters = int(projection_dim/4),kernel_size = patchSize,strides = timeStep, data_format = "channels_last")

    def call(self, inputData):

        accProjections = self.accProjection(inputData[:,:,:3])
        gyroProjections = self.gyroProjection(inputData[:,:,3:6])
        magProjection = self.magProjection(inputData[:,:,6:9])
        altProjection = self.altProjection(inputData[:,:,9:])

        Projections = tf.concat((accProjections,gyroProjections,magProjection,altProjection),axis=2)
        return Projections
    def get_config(self):
        config = super().get_config().copy()
        config.update({
            'patchSize': self.patchSize,
            'projection_dim': self.projection_dim,
            'timeStep': self.timeStep,})
        return config

def extract_intermediate_model_from_base_model(base_model, intermediate_layer=4):
    model = tf.keras.Model(inputs=base_model.inputs, outputs=base_model.layers[intermediate_layer].output, name=base_model.name + "_layer_" + str(intermediate_layer))
    return model

def HART(input_shape,activityCount, projection_dim = 192,patchSize = 16,timeStep = 16,num_heads = 3,filterAttentionHead = 4, convKernels = [3, 7, 15, 31, 31, 31], mlp_head_units = [1024],dropout_rate = 0.3,useTokens = False):
    projectionHalf = projection_dim//2
    projectionQuarter = projection_dim//4
    projectionThird = projection_dim//3
    projectionSixth = projection_dim//6
    projectionFiveSixth = projection_dim*5//6
    projectionTwoThird = projection_dim*2//3
    dropPathRate = np.linspace(0, dropout_rate* 10, len(convKernels)) * 0.1
    transformer_units = [
    projection_dim * 2,
    projection_dim,]
    inputs = layers.Input(shape=input_shape)
    # print(input)
    try:
        patches = SensorPatches(projection_dim,patchSize,timeStep)(inputs)
    except:
        print("Sensor patches")
    # print("No problem with Sensor Patches")
    if(useTokens):
        try:
            patches = ClassToken(projection_dim)(patches)
        except:
            print("ClassToken")
    # print("No problem with Class Token")
    patchCount = patches.shape[1]
    try:
        encoded_patches = PatchEncoder(patchCount, projection_dim)(patches)
    except:
        print("Problem with Patch Encoder")
    # print("No problem with Patch Encoder")

    # Create multiple layers of the Transformer block.
    for layerIndex, kernelLength in enumerate(convKernels):
        try:
            x1 = layers.LayerNormalization(epsilon=1e-6 , name = "normalizedInputs_"+str(layerIndex))(encoded_patches)
            x1_a1 = x1[:,:,:projectionSixth]
            x1_a2 = x1[:,:,projectionSixth:projectionThird]
            x1_b1 = x1[:,:,projectionThird:projectionHalf]
            x1_b2 = x1[:,:,projectionHalf:projectionTwoThird]
            x1_c1 = x1[:,:,projectionTwoThird:projectionFiveSixth]
            x1_c2 = x1[:,:,projectionFiveSixth:]
            x1_multihead = tf.concat([x1_a1, x1_b1, x1_c1], axis=-1)
            x1_liteformer = tf.concat([x1_a2, x1_b2, x1_c2], axis=-1)

        except:
            print("Problem with LayerNormalization")
        # print("No problem with LayerNormalisation")
        try:
            branch1 = liteFormer(
                            startIndex = 0,
                            stopIndex = projectionHalf,
                            projectionSize = projectionHalf,
                            attentionHead =  filterAttentionHead,
                            kernelSize = kernelLength,
                            dropPathRate = dropPathRate[layerIndex],
                            dropout_rate = dropout_rate,
                            name = "liteFormer_"+str(layerIndex))(x1_liteformer)
        except:
            print("Problem with Lite Transformer")
        # print("No problem with Lite Tranformer")

        try:
            branch2Acc = SensorWiseMHA(projectionSixth,num_heads,0,projectionSixth,dropPathRate = dropPathRate[layerIndex],dropout_rate = dropout_rate,name = "AccMHA_"+str(layerIndex))(x1_multihead)
        except:
            print("Problem with Acceleration Barnch")
        # print("No problem with Acceleration branch")
        try:
            branch2Gyro = SensorWiseMHA(projectionSixth,num_heads,projectionSixth,projectionThird,dropPathRate = dropPathRate[layerIndex],dropout_rate = dropout_rate, name = "GyroMHA_"+str(layerIndex))(x1_multihead)
        except:
            print("Problem with Gyro Branch")
        # print("No problem with Gyro branch")
        try:
            branch2TotalAcc = SensorWiseMHA(projectionSixth,num_heads,projectionThird,projectionHalf,dropPathRate = dropPathRate[layerIndex],dropout_rate = dropout_rate,name = "TotalAccMHA_"+str(layerIndex))(x1_multihead)
        except:
            print("Problem with Total Acceleration Branch")
        # print("No problem with Total Acceleration branch")
        concatAttention = tf.concat((branch2Acc,branch1,branch2Gyro,branch2TotalAcc),axis= 2)
        # print(tf.size(concatAttention))
        # print(tf.size(encoded_patches))
        # try:
        x2 = layers.Add()([concatAttention, encoded_patches])
        # except:
        #     print("Problem with concatAttention and encoded patches")
        # print("No problem with concatAttention and encoded patches")
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        x3 = mlp2(x3, hidden_units=transformer_units, dropout_rate=dropout_rate)
        x3 = DropPath(dropPathRate[layerIndex])(x3)
        encoded_patches = layers.Add()([x3, x2])
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    if(useTokens):
        representation = layers.Lambda(lambda v: v[:, 0], name="ExtractToken")(representation)
    else:
        representation = layers.GlobalAveragePooling1D()(representation)
    features = mlp(representation, hidden_units=mlp_head_units, dropout_rate=dropout_rate)

    reshaped = Reshape((128,8))(features)
    def squash(inputs):
        # take norm of input vectors
        squared_norm = K.sum(K.square(inputs), axis = -1, keepdims = True)
        # use the formula for non-linear function to return squashed output
        return ((squared_norm/(1+squared_norm))/(K.sqrt(squared_norm+K.epsilon())))*inputs
    # squash the reshaped output to make length of vector b/w 0 and 1
    squashed_output = Lambda(squash)(reshaped)
    class DigitCapsuleLayer(Layer):
        # creating a layer class in keras
        def __init__(self, **kwargs):
            super(DigitCapsuleLayer, self).__init__(**kwargs)
            self.kernel_initializer = initializers.get('glorot_uniform')

        def build(self, input_shape):
            # initialize weight matrix for each capsule in lower layer
            self.W = self.add_weight(shape = [6, 128, 16, 8], initializer = self.kernel_initializer, name = 'weights')
            self.built = True

        def call(self, inputs):
            inputs = K.expand_dims(inputs, 1)
            inputs = K.tile(inputs, [1, 6, 1, 1])
            # matrix multiplication b/w previous layer output and weight matrix
            inputs = K.map_fn(lambda x: own_batch_dot(x, self.W, [2, 3]), elems=inputs)
            b = tf.zeros(shape = [K.shape(inputs)[0], 6, 128])

    # routing algorithm with updating coupling coefficient c, using scalar product b/w input capsule and output capsule
            for i in range(3-1):
                # print(b)
                c = tf.nn.softmax(b, axis=1)
                print(c)
                s = own_batch_dot(c, inputs, [2, 2])
                print(s)
                v = squash(s)
                print(v)
                b = b + own_batch_dot(v, inputs, [2,3])

            return v
        def compute_output_shape(self, input_shape):
            return tuple([None, 6, 16])
    def output_layer(inputs):
        return K.sqrt(K.sum(K.square(inputs), -1) + K.epsilon())
    digit_caps = DigitCapsuleLayer()(squashed_output)
    # flatten1 = Flatten()(digit_caps)
    # fc_inference = Dense(activityCount, activation='softmax')(flatten1)
    outputs = Lambda(output_layer)(digit_caps)

    # logits = layers.Dense(activityCount,  activation='softmax')(features)
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model
# ------------------------------specific module for MobileHART------------------------------

def conv_block(x, filters=16, kernel_size=3, strides=2):
    conv_layer = layers.Conv1D(
        filters, kernel_size, strides=strides, activation=tf.nn.swish, padding="same"
    )
    return conv_layer(x)

def inverted_residual_block(x, expanded_channels, output_channels, strides=1):
    m = layers.Conv1D(expanded_channels, 1, padding="same", use_bias=False)(x)
    m = layers.BatchNormalization()(m)
    m = tf.nn.swish(m)

    if strides == 2:
        m = layers.ZeroPadding1D(padding=1)(m)
    m = layers.DepthwiseConv1D(
        3, strides=strides, padding="same" if strides == 1 else "valid", use_bias=False
    )(m)
    m = layers.BatchNormalization()(m)
    m = tf.nn.swish(m)

    m = layers.Conv1D(output_channels, 1, padding="same", use_bias=False)(m)
    m = layers.BatchNormalization()(m)

    if tf.math.equal(x.shape[-1], output_channels) and strides == 1:
        return layers.Add()([m, x])
    return m

def transformer_block(x, transformer_layers, projection_dim, dropout_rate = 0.3,num_heads=2):

    dropPathRate = np.linspace(0, dropout_rate* 10,transformer_layers) * 0.1


    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(x)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=dropout_rate
        )(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, x])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp2(
            x3,
            hidden_units=[x.shape[-1] * 2, x.shape[-1]],
            dropout_rate=dropout_rate,
        )
        # Skip connection 2.
        x = layers.Add()([x3, x2])

    return x

def mobilevit_block(x, num_blocks, projection_dim, strides=1):
    # Local projection with convolutions.
    local_features = conv_block(x, filters=projection_dim, strides=strides)
    local_features = conv_block(
        local_features, filters=projection_dim, kernel_size=1, strides=strides
    )
    global_features = transformer_block(
        local_features, num_blocks, projection_dim
    )

    # Apply point-wise conv -> concatenate with the input features.
    folded_feature_map = conv_block(
        global_features, filters=x.shape[-1], kernel_size=1, strides=strides
    )
    local_global_features = layers.Concatenate(axis=-1)([x, folded_feature_map])

    # Fuse the local and global features using a convoluion layer.
    local_global_features = conv_block(
        local_global_features, filters=projection_dim, strides=strides
    )

    return local_global_features


def sensorWiseTransformer_block(xAcc, xGyro, patchCount,transformer_layers, projection_dim,kernelSize = 4,  dropout_rate = 0.3,num_heads=2):
    projectionQuarter = projection_dim // 4
    projectionHalf = projection_dim // 2
    dropPathRate = np.linspace(0, dropout_rate* 10,transformer_layers) * 0.1

    x = tf.concat((xAcc,xGyro),axis= 2 )
    for layerIndex in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6, name = "normalizedInputs_"+str(layerIndex))(x)

        branch1 = liteFormer(
                            startIndex = projectionQuarter,
                            stopIndex = projectionQuarter + projectionHalf,
                            projectionSize = projectionHalf,
                            attentionHead =  num_heads,
                            kernelSize = kernelSize,
                            dropPathRate = dropPathRate[layerIndex],
                            name = "liteFormer_"+str(layerIndex))(x1)

        branch2Acc = SensorWiseMHA(projectionQuarter,num_heads,0,projectionQuarter,dropPathRate = dropPathRate[layerIndex],dropout_rate = dropout_rate,name = "AccMHA_"+str(layerIndex))(x1)
        branch2Gyro = SensorWiseMHA(projectionQuarter,num_heads,projectionQuarter + projectionHalf ,projection_dim,dropPathRate = dropPathRate[layerIndex],dropout_rate = dropout_rate,name = "GyroMHA_"+str(layerIndex))(x1)
        concatAttention = tf.concat((branch2Acc,branch1,branch2Gyro),axis= 2 )
        # Skip connection 1.
        x2 = layers.Add()([concatAttention, x])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp2(
            x3,
            hidden_units=[x.shape[-1] * 2, x.shape[-1]],
            dropout_rate=dropout_rate,
        )
        x3 = DropPath(dropPathRate[layerIndex])(x3)
        # Skip connection 2.
        x = layers.Add()([x3, x2])

    return x
def sensorWiseHART(xAcc,xGyro, num_blocks, projection_dim, kernelSize = 4, strides=1):
    # Local projection with convolutions.
#     ---------------acc--------------
    local_featuresAcc = conv_block(xAcc, filters=projection_dim//2, strides=strides)
    local_featuresAcc = conv_block(
        local_featuresAcc, filters=projection_dim//2, kernel_size=1, strides=strides
    )

#     ---------------gyro--------------

    local_featuresGyro = conv_block(xGyro, filters=projection_dim//2, strides=strides)
    local_featuresGyro = conv_block(
        local_featuresGyro, filters=projection_dim//2, kernel_size=1, strides=strides
    )
    global_features = sensorWiseTransformer_block(local_featuresAcc,
        local_featuresGyro, local_featuresGyro.shape[1], num_blocks, projection_dim, kernelSize = kernelSize
    )

    folded_feature_map_acc = conv_block(
        global_features[:,:,:projection_dim//2], filters=xAcc.shape[-1], kernel_size=1, strides=strides
    )
    local_global_features_acc = layers.Concatenate(axis=-1)([xAcc, folded_feature_map_acc])
        # Fuse the local and global features using a convoluion layer.
    local_global_features_acc = conv_block(
        local_global_features_acc, filters=projection_dim//2, strides=strides
    )

    folded_feature_map_gyro = conv_block(
        global_features[:,:,projection_dim//2:], filters=xGyro.shape[-1], kernel_size=1, strides=strides
    )
    local_global_features_gyro = layers.Concatenate(axis=-1)([xGyro, folded_feature_map_gyro])

    local_global_features_gyro = conv_block(
        local_global_features_gyro, filters=projection_dim//2, strides=strides
    )

    return local_global_features_acc, local_global_features_gyro
def mv2Block(x,expansion_factor,filterCount):
    x = inverted_residual_block(
        x, expanded_channels=filterCount[0] * expansion_factor, output_channels=filterCount[1]
    )
    # Downsampling with MV2 block.
    x = inverted_residual_block(
        x, expanded_channels=filterCount[1] * expansion_factor, output_channels=filterCount[2], strides=2
    )
    x = inverted_residual_block(
        x, expanded_channels=filterCount[2] * expansion_factor, output_channels=filterCount[2]
    )
    x = inverted_residual_block(
        x, expanded_channels=filterCount[2] * expansion_factor, output_channels=filterCount[2]
    )
    # First MV2 -> MobileViT block.
    x = inverted_residual_block(
        x, expanded_channels=filterCount[2] * expansion_factor, output_channels=filterCount[3], strides=2
    )
    return x
    # def hartModel(input_shape,activityCount, projection_dim,patchSize,timeStep,num_heads,filterAttentionHead, convKernels = [3, 7, 15, 31, 31, 31], mlp_head_units = [1024],dropout_rate = 0.3,useTokens = True):

def mobileHART_XS(input_shape,activityCount,projectionDims = [96,120,144],filterCount = [16//2,32//2,48//2,64//2,80,96,384],expansion_factor=4,mlp_head_units = [1024],dropout_rate = 0.3):

    # inputs = keras.Input((segment_size, num_input_channels))
    inputs = layers.Input(shape=input_shape)

    # Initial conv-stem -> MV2 block.
    accX = conv_block(inputs[:,:,:3],filters=filterCount[0])
    gyroX = conv_block(inputs[:,:,3:],filters=filterCount[0])
    accX = mv2Block(accX,expansion_factor,filterCount)
    gyroX = mv2Block(gyroX,expansion_factor,filterCount)
    accX, gyroX  = sensorWiseHART(accX,gyroX, num_blocks=2, projection_dim=projectionDims[0])
    x = tf.concat((accX,gyroX), axis = 2)
    x = layers.Dense(projectionDims[0],activation=tf.nn.swish)(x)
    x = layers.Dropout(dropout_rate)(x)

    # Second MV2 -> MobileViT block.
    x = inverted_residual_block(
        x, expanded_channels=projectionDims[0] * expansion_factor, output_channels=filterCount[4], strides=2
    )
    x = mobilevit_block(x, num_blocks=4, projection_dim=projectionDims[1])

    # Third MV2 -> MobileViT block.
    x = inverted_residual_block(
        x, expanded_channels=projectionDims[1] * expansion_factor, output_channels=filterCount[5], strides=2
    )
    x = mobilevit_block(x, num_blocks=3, projection_dim=projectionDims[2])
    x = conv_block(x, filters=filterCount[6], kernel_size=1, strides=1)
    # Classification head.
    x = layers.GlobalAvgPool1D(name = "GAP")(x)

    x = mlp(x, hidden_units=mlp_head_units, dropout_rate=dropout_rate)

    outputs = layers.Dense(activityCount, activation="softmax")(x)
# f.keras.Model(inputs=inputs, outputs=logits)
    return tf.keras.Model(inputs, outputs)

def mobileHART_XXS(input_shape,activityCount,projectionDims = [64,80,96],filterCount = [16//2,16//2,24//2,48//2,64,80,320],expansion_factor=2,mlp_head_units = [1024],dropout_rate = 0.3):

    # inputs = keras.Input((segment_size, num_input_channels))
    inputs = layers.Input(shape=input_shape)

    # Initial conv-stem -> MV2 block.
    accX = conv_block(inputs[:,:,:3],filters=filterCount[0])
    gyroX = conv_block(inputs[:,:,3:],filters=filterCount[0])
    accX = mv2Block(accX,expansion_factor,filterCount)
    gyroX = mv2Block(gyroX,expansion_factor,filterCount)
    accX, gyroX  = sensorWiseHART(accX,gyroX, num_blocks=2, projection_dim=projectionDims[0])
    x = tf.concat((accX,gyroX), axis = 2)
    x = layers.Dense(projectionDims[0],activation=tf.nn.swish)(x)
    x = layers.Dropout(dropout_rate)(x)

    # Second MV2 -> MobileViT block.
    x = inverted_residual_block(
        x, expanded_channels=projectionDims[0] * expansion_factor, output_channels=filterCount[4], strides=2
    )
    x = mobilevit_block(x, num_blocks=4, projection_dim=projectionDims[1])

    # Third MV2 -> MobileViT block.
    x = inverted_residual_block(
        x, expanded_channels=projectionDims[1] * expansion_factor, output_channels=filterCount[5], strides=2
    )
    x = mobilevit_block(x, num_blocks=3, projection_dim=projectionDims[2])
    x = conv_block(x, filters=filterCount[6], kernel_size=1, strides=1)
    # Classification head.
    x = layers.GlobalAvgPool1D(name = "GAP")(x)

    x = mlp(x, hidden_units=mlp_head_units, dropout_rate=dropout_rate)

    outputs = layers.Dense(activityCount, activation="softmax")(x)
# f.keras.Model(inputs=inputs, outputs=logits)
    return tf.keras.Model(inputs, outputs)

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# load a single file as a numpy array
def load_file(filepath):
    dataframe = read_csv(filepath, header=None, delim_whitespace=True)
    return dataframe.values

In [4]:
# load a list of files and return as a 3d numpy array
def load_group(filenames, prefix=''):
    loaded = list()
    for name in filenames:
        data = load_file(prefix + name)
        loaded.append(data)
    # stack group so that features are the 3rd dimension
    loaded = dstack(loaded)
    return loaded

In [5]:
# load a dataset group, such as train or test
def load_dataset_group(group, prefix=''):
    filepath = prefix + group + '/Inertial Signals/'
    print('File Path : ',filepath)
    # load all 9 files as a single array
    filenames = list()
    # total acceleration
    filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
    # body acceleration
    filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
    # body gyroscope
    filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
    # load input data
    X = load_group(filenames, filepath)
    # load class output
    y = load_file(prefix + group + '/y_'+group+'.txt')
    return X, y

In [6]:
# load all train
X_train, Y_train = load_dataset_group('train', '/content/drive/MyDrive/HARCNNLSTM/UCIDataset/')
# load all test
X_test, Y_test = load_dataset_group('test', '/content/drive/MyDrive/HARCNNLSTM/UCIDataset/')

# zero-offset class values
Y_train = Y_train - 1
Y_test = Y_test - 1
# one hot encode y
Y_train = to_categorical(Y_train)
Y_test = to_categorical(Y_test)

print('X_train.shape : ', X_train.shape)
print('Y_train.shape : ', Y_train.shape)
print('X_test.shape : ', X_test.shape)
print('Y_test.shape : ', Y_test.shape)

File Path :  /content/drive/MyDrive/HARCNNLSTM/UCIDataset/train/Inertial Signals/
File Path :  /content/drive/MyDrive/HARCNNLSTM/UCIDataset/test/Inertial Signals/
X_train.shape :  (7352, 128, 9)
Y_train.shape :  (7352, 6)
X_test.shape :  (2947, 128, 9)
Y_test.shape :  (2947, 6)


In [7]:
verbose = 1
epochs = 500
batch_size = 128

n_timesteps = X_train.shape[1]
n_features = X_train.shape[2]
n_outputs = Y_train.shape[1]

In [8]:
checkpoint = ModelCheckpoint("HART_Capsule.h5", monitor='val_accuracy', verbose=1,
                             save_best_only=True, save_weights_only=False, mode='auto')

In [11]:
model_classifier = HART((128, 9), 6)

model_classifier.compile(
    optimizer=tf.keras.optimizers.Adam(0.001),
    loss=tf.keras.losses.CategoricalCrossentropy(),
    metrics=["accuracy"],
)

history = model_classifier.fit(x=X_train,
    y=Y_train,
    validation_data = (X_test,Y_test),
    batch_size=batch_size,
    epochs=epochs,
    verbose=1, callbacks=[checkpoint])

Tensor("digit_capsule_layer_1/transpose_1:0", shape=(None, 6, 128), dtype=float32)
Tensor("digit_capsule_layer_1/Squeeze:0", shape=(None, 6, 16), dtype=float32)
Tensor("digit_capsule_layer_1/mul:0", shape=(None, 6, 16), dtype=float32)
Tensor("digit_capsule_layer_1/transpose_3:0", shape=(None, 6, 128), dtype=float32)
Tensor("digit_capsule_layer_1/Squeeze_2:0", shape=(None, 6, 16), dtype=float32)
Tensor("digit_capsule_layer_1/mul_1:0", shape=(None, 6, 16), dtype=float32)
Epoch 1/500
Tensor("model_3/digit_capsule_layer_1/transpose_1:0", shape=(None, 6, 128), dtype=float32)
Tensor("model_3/digit_capsule_layer_1/Squeeze:0", shape=(None, 6, 16), dtype=float32)
Tensor("model_3/digit_capsule_layer_1/mul:0", shape=(None, 6, 16), dtype=float32)
Tensor("model_3/digit_capsule_layer_1/transpose_3:0", shape=(None, 6, 128), dtype=float32)
Tensor("model_3/digit_capsule_layer_1/Squeeze_2:0", shape=(None, 6, 16), dtype=float32)
Tensor("model_3/digit_capsule_layer_1/mul_1:0", shape=(None, 6, 16), dtype=f

<keras.src.callbacks.History at 0x789c182182e0>

In [1]:
# Maximum validation Accuracy
val_accuracy_tensor = history.history['val_accuracy']
best_val_accuracy = np.max(val_accuracy_tensor)
print("Best Validation Accuracy:", best_val_accuracy)

Best Validation Accuracy: 0.9551153521462504
