We provided easy to use notebook to replicate the results. 
Please refer to the corresponding author if there is any issue.



Loading the data


In [None]:
from tensorflow.keras.utils import to_categorical
import keras
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import seaborn as sns

import os
import torch
import statistics
from sklearn.model_selection import train_test_split
import random
INPUT_DIR = ##### add the directory to the data
seed = 42
os.environ['PYTHONHASHSEED'] = str(seed)
# Torch RNG
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# Python RNG
np.random.seed(seed)
random.seed(seed)

WINDOW_SIZE = 160
NUMBER_MODELS = 1
# some utility functions 

def normalize_z(x, avg, std_dev, smooth = 0.0001):
  return (x-avg) / (std_dev + smooth)

def normalize_mm(x, minn, maxx, smooth = 0.0001):
  return 2*(x-(minn+maxx)/2) / (maxx - minn + smooth)


def make_autopct(values):
    def my_autopct(pct):
        total = sum(values)
        val = int(round(pct*total/100.0))

        val = int(val / 1000)
        return '{p:.2f}%  ({v:d}K)'.format(p=pct,v=val)
    return my_autopct

# Defining a function for plotting training and validation learning curves
def plot_history(history):
	  # plot loss
    plt.title('Loss')
    plt.plot(history.history['loss'], color='blue', label='train')
    plt.plot(history.history['val_loss'], color='red', label='test')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'])
    plt.show()
    
    # plot accuracy
    plt.title('Accuracy')
    plt.plot(history.history['categorical_accuracy'], color='blue', label='train')
    plt.plot(history.history['val_categorical_accuracy'], color='red', label='test')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'])
    plt.show()

def prep_label(y_x):
  label=[]
  for i in range(len(y_x)):
    # print(i)
    try:
      label.append(statistics.mode(y_x[i]))
    except:
      try:
        if statistics.mode(y_x[i-1])==statistics.mode(y_x[i+1]):
           label.append(statistics.mode(y_x[i+1]))                 
        else:
           label.append(statistics.mode(y_x[i-1])) 
      except:   
        label.append(statistics.mode(y_x[i-2]))
  return np.array(label)

# load data
from sklearn.model_selection import  train_test_split

dataset = np.load(os.path.join(INPUT_DIR, "dataset.npy"))
x, y =dataset[:,:, :7], dataset[:,:, 7]

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.3)
phase_train = x_train[:,:,6] #phase
phase_test=x_test[:,:,6]  #phase

mean = np.load(os.path.join(INPUT_DIR, "x_mean.npy"))
std = np.load(os.path.join(INPUT_DIR, "x_stddev.npy"))


x_train = normalize_z(x_train[:,:,:6], mean, std)
x_test = normalize_z(x_test[:,:,:6], mean, std)


x_train1 = x_train #raw input
y_train1 = y_train #labels

x_test1 = x_test #raw input
y_test1 = y_test  #labels

from tensorflow.keras.utils import to_categorical
y_train = to_categorical(y_train1)
y_test = to_categorical(y_test1)

Encoding Functions:


*   One-hot encoding 
*   Gaussian encoding



In [None]:
from scipy.stats import norm

#### Gaussian encoding

def gauss_fit(phase):
  from scipy.stats import norm
  aa=np.nonzero(phase)
  sigma =.1
  x1 = 0
  x2 = 8
  out=0
  for mu in aa[0]:

   z1 = ( x1 - mu ) / sigma
   z2 = ( x2 - mu ) / sigma
   x = np.arange(z1, z2, 10) # range of x in spec
   x_all = np.arange(-10, 10, 0.001) # entire range of x, both in and out of spec
   y = norm.pdf(x,0,1)
   out+=y
  return out

#### One-hot encoding

def encode_angle(angle, N):

        num_bins = N
        idx = int(angle / np.pi * num_bins) % num_bins
        one_hot = np.zeros(num_bins)
        one_hot[idx] = 1
        return one_hot

In [None]:
#### train data
ag=phase_train
num_bins=8 #number of bins
bb1=np.zeros((ag.shape[0],ag.shape[1],num_bins))
for j in range(ag.shape[0]):
 for i in range(ag.shape[1]):
  bb1[j,i,:]=gauss_fit(encode_angle(ag[j,i],num_bins))   # one-hot encoding can simply be achieved by removing the gauss_fit

#### validation data
ag=phase_test
num_bins=8
bb2=np.zeros((ag.shape[0],ag.shape[1],num_bins))
for j in range(ag.shape[0]):
 for i in range(ag.shape[1]):
  bb2[j,i,:]=gauss_fit(encode_angle(ag[j,i],num_bins))

x_train1=np.concatenate((x_train,bb1),axis=2) # adding encoded phase information to the input
x_test1=np.concatenate((x_test,bb2),axis=2)

Preparing the data for training

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

bs = 128
x_train1= np.transpose(x_train1,(0,2,1))
x_test1= np.transpose(x_test1,(0,2,1))
x_train2= torch.from_numpy(x_train1)
y_train2= torch.from_numpy(y_train)
phase_train=torch.from_numpy(phase_train)
x_test2= torch.from_numpy(x_test1)
y_test2= torch.from_numpy(y_test)
phase_test=torch.from_numpy(phase_test)
train_ds = TensorDataset(x_train2, y_train2,phase_train)
train_dl = DataLoader(train_ds, batch_size=bs, shuffle=True)

test_ds = TensorDataset(x_test2, y_test2,phase_test)
test_dl = DataLoader(test_ds, batch_size=bs)

Loss function

In [None]:
import pdb
def weighted_crossentropy(y_true, y_pred):    #### weighted crossentropy
  EPSILON = 0.0001
  # pdb.set_trace()
  uu=torch.broadcast_to(torch.Tensor([2, 1, 1, 2]), y_true.shape).to(device)
  vv=torch.broadcast_to(torch.Tensor([1, 1, 1, 1]), y_pred.shape).to(device)
  # pdb.set_trace()
  y_t = torch.multiply(uu , y_true).to(device)
  h_t = torch.multiply(vv , y_pred).to(device)
  # pdb.set_trace()
  return torch.mean(- torch.multiply(y_t  , torch.log(torch.add(h_t, EPSILON))))

def loss_batch_class(model, xb, yb,phb, opt=None):
    xb1=xb[:,:,:80] # the first half of the input
    x_recon, phase, y= model(xb1.float())
    mse= nn.MSELoss()
    l1_loss = nn.L1Loss()
    class_w=0
    lam=0.001
    lam1=0.1 
    recon_loss =mse(xb.float(),x_recon )  # reconstruction loss

    x_cos =phase[:,0] # cosine axis of the phase
    x_sin =phase[:,1] # sine axis of the phase


    ## middle phase
    eee1= (torch.cos(phb[:,80].float())).view(-1,1) # true cosine phase
    eee2= (torch.sin(phb[:,80].float())).view(-1,1) # true sine phase
    
    rr= torch.cat([eee1,eee2],axis=1) #128*2
    rr_cos= rr[:,0]/torch.norm(rr,dim=1) 
    rr_sin= rr[:,1]/torch.norm(rr,dim=1) 
    
    g_phase=torch.cat([rr_cos.view(-1,1), rr_sin.view(-1,1)],dim=1)
    es_phase=torch.cat([x_cos.view(-1,1), x_sin.view(-1,1)],dim=1)

    loss2=mse(g_phase,es_phase) # phase reconstruction

    circ=torch.norm(es_phase,dim=1) 
    loss3=mse(circ,torch.ones_like(circ)) #||X-1|| make sure that the point are close to the unit circle

    loss= recon_loss+lam*loss2+lam1*loss3 +class_w*weighted_crossentropy(yb[:,:,:],y) # full loss

    if opt is not None:
        opt.zero_grad()
        loss.backward()
        opt.step()
    return loss.item(), len(xb)

Model 

In [None]:
!pip install torchdiffeq

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
dim=64
# num_bins=16
adjoint= False
if adjoint:
    from torchdiffeq import odeint_adjoint as odeint
else:
    from torchdiffeq import odeint
def norm(dim):
    """
    Group normalization to improve model accuracy and training speed.
    """
    return nn.GroupNorm(min(32, dim), dim)
class ConcatConv1d(nn.Module):
    """
    1d convolution concatenated with time for usage in ODENet.
    """
    def __init__(self, dim_in, dim_out, kernel_size=3, stride=1, padding=0, bias=True, transpose=False):
        super(ConcatConv1d, self).__init__()
        module = nn.ConvTranspose1d if transpose else nn.Conv1d
        self._layer = module(
            dim_in + 1, dim_out, kernel_size=kernel_size, stride=stride, padding=padding,
            bias=bias
        )

    def forward(self, t, x):
        tt = torch.ones_like(x[:, :1, :]) * t
        ttx = torch.cat([tt, x], 1)
        return self._layer(ttx)


class ODEfunc(nn.Module):
    """
    Network architecture for ODENet.
    """
    def __init__(self, dim):
        super(ODEfunc, self).__init__()
        self.norm1 = norm(dim)
        self.relu = nn.SELU(inplace=True)
        self.conv1 = ConcatConv1d(dim, dim, 3, 1, 1)
        self.norm2 = norm(dim)
        self.conv2 = ConcatConv1d(dim, dim, 3, 1, 1)
        self.norm3 = norm(dim)
        # self.drop1 = nn.Dropout(.25)
        # self.drop2 = nn.Dropout(.25)
        self.nfe = 0    # Number of function evaluations

    def forward(self, t, x):
        self.nfe += 1
        out = self.norm1(x)
        out = self.relu(out)
        out = self.conv1(t, x)
        # out = self.drop1(out)
        out = self.norm2(out)
        out = self.relu(out)
        out = self.conv2(t, out)
        out = self.norm3(out)
        return out

tol=1e-3
class ODENet(nn.Module):
    # def __init__(self, odefunc, rtol=1e-3, atol=1e-3):
    #     super(ODENet, self).__init__()
    #     self.odefunc = odefunc
    #     self.integration_time = torch.tensor([0, 1]).float()
    #     # self.rtol = rtol
    #     # self.atol = atol

    # def forward(self, x):
    #     self.integration_time = self.integration_time.type_as(x)
    #     out = odeint_adjoint(self.odefunc, x, self.integration_time, rtol1, rtol1)
        # return out[1]
    def __init__(self, odefunc):
        super(ODENet, self).__init__()
        self.odefunc = odefunc
        self.integration_time = torch.tensor([0, 1]).float()

    def forward(self, x):
        self.integration_time = self.integration_time.type_as(x)
        # pdb.set_trace()
        out = odeint(self.odefunc, x, self.integration_time, rtol=tol, atol=tol)
        # pdb.set_trace()
        return out[1]

    # Update number of function evaluations (nfe)
    @property
    def nfe(self):
        return self.odefunc.nfe

    @nfe.setter
    def nfe(self, value):
        self.odefunc.nfe = value

class Reshape(nn.Module):
    def __init__(self, *args):
        super(Reshape, self).__init__()
        self.shape = args

    def forward(self, x):
        return x.view(-1,160,4)
class Flatten(nn.Module):
    """
    Flatten feature maps for input to linear layer.
    """
    def __init__(self):
        super(Flatten, self).__init__()

    def forward(self, x):
        shape = torch.prod(torch.tensor(x.shape[1:])).item()
        return x.view(-1, shape)

class classifier(nn.Module):
        def __init__(self):
          super(classifier, self).__init__()
          self.norm1=norm(dim)
          self.relu=nn.ReLU(inplace=True)
          self.avg=nn.AdaptiveAvgPool1d(1)
          self.fc_layer1=nn.Linear(dim, 160*4)
          self.softmax=nn.Softmax()
          self.flatten=Flatten()
          self.reshape=Reshape()

        def forward(self, x):
          x=self.norm1(x)
          x=self.relu(x)
          x=self.avg(x)
          x=self.flatten(x)
          x= self.fc_layer1(x)
          x=self.reshape(x)
          return self.softmax(x)

class Latent(nn.Module):
        def __init__(self):
          super(Latent, self).__init__()
          self.norm1=norm(dim)
          self.relu=nn.ReLU(inplace=True)
          self.avg= nn.AdaptiveAvgPool1d(1)
          self.flatten=Flatten()
          self.linear=nn.Linear(dim, 2)
        def forward(self, x):
          x=self.norm1(x)
          x=self.relu(x)
          x=self.avg(x)
          x=self.flatten(x)
          x=self.linear(x)
          return x


class downsampling_layers(nn.Module):
        def __init__(self):
          super(downsampling_layers, self).__init__()
          self.conv1=nn.Conv1d(6+8, dim, 3, 1)
          self.norm1=norm(dim)
          self.selu=nn.SELU(inplace=False)
          self.conv2=nn.Conv1d(dim, dim, 4, 2, 1)
          self.norm2=norm(dim)
          self.conv3=nn.Conv1d(dim, dim, 3, 2, 1) # 4 to 2 for forcasting 80 window

        def forward(self, x):

          x=self.conv1(x)
          x=self.norm1(x)
          x=self.selu(x)

          x=self.conv2(x)
          x=self.norm2(x)
          x=self.selu(x)

          x=self.conv3(x)
          return x
 
class Decoder(nn.Module):
        def __init__(self):
          super(Decoder, self).__init__()
          self.conv1=nn.ConvTranspose1d(64,32,11,1)
          # self.upsample1=nn.Upsample(scale_factor=4, mode='nearest')
          self.norm1= norm(32)
          self.selu=nn.SELU(inplace=False)
          self.conv2= nn.ConvTranspose1d(32,64,10,1)
          self.norm2= norm(64)
          self.conv3= nn.ConvTranspose1d(64,64,10,1)
          self.norm3= norm(64)
          self.conv4= nn.ConvTranspose1d(64,64,4,1) #(4 to 10) for whole window
          self.norm4= norm(64)
          self.conv5= nn.ConvTranspose1d(64,6+8,10,3) # (10 to 8)(3 to 2) for whole window
          # self.upsample2=nn.Upsample(scale_factor=10, mode='nearest')
          self.norm5= norm(6+8)    

        def forward(self, x):
          x=self.conv1(x)
          x=self.norm1(x)
          x=self.selu(x)

          x=self.conv2(x)
          x=self.norm2(x)
          x=self.selu(x)

          x=self.conv3(x)
          x=self.norm3(x)
          x=self.selu(x)

          x=self.conv4(x)
          x=self.norm4(x)
          x=self.selu(x)

          x=self.conv5(x)
          x=self.norm5(x)
          x=self.selu(x)
          return x
    
class Classification(nn.Module):
      def __init__(self, down_samp,decoder,feature_layers, classifier=None,latent=None ):
        super(Classification, self).__init__()
        self.down_samp = down_samp
        self.decoder=decoder
        self.feature_layers=feature_layers
        self.classifier = classifier
        self.latent=latent
                
      def forward(self, x):
        x_shape = self.down_samp(x) # encoder
        x_phase=self.latent(x_shape)# mapping to 2d
        y=self.classifier(x_shape) #classifier
        x = self.feature_layers(x_shape)
        y_recon =self.decoder(x)
        
        return  y_recon,x_phase, y it should be in the first place

down_samp= downsampling_layers()
feature_layers =ODENet(ODEfunc(dim))
decoder=Decoder()
decoder_class= classifier() 
phase_model=Latent()
class_model=Classification(down_samp=down_samp,decoder=decoder, feature_layers=feature_layers,classifier=decoder_class,latent=phase_model)
model_parameters11 = filter(lambda p: p.requires_grad, class_model.parameters())
params_len = sum([np.prod(p.size()) for p in model_parameters11])
print(params_len)

Training

In [None]:
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau
device = torch.device('cuda:' + str(0) if torch.cuda.is_available() else 'cpu')
num_batches = len(train_dl)
import time
import math
epochs=150
model1=class_model.to(device)
val_tmp=math.inf
loss_func=F.cross_entropy
opt=optim.RMSprop(model1.parameters(), lr=.001)
valid_dl=test_dl
scheduler=ReduceLROnPlateau(opt, 'min',patience=5,verbose=True)

for epoch in range(epochs):
        print(f"Training... epoch {epoch + 1}")
        device = torch.device('cuda:' + str(0) if torch.cuda.is_available() else 'cpu')
        
        model1.train()   # Set model to training mode
        batch_count = 0
        start = time.time()
        for xb, yb,phb in train_dl:
            batch_count += 1
            curr_time = time.time()
            percent = round(batch_count/len(train_dl) * 100, 1)
            elapsed = round((curr_time - start)/60, 1)
            tr_loss,_=loss_batch_class(model1.to(device), xb.to(device), yb.to(device),phb.to(device),opt)
            print("Percent trained: {:.4f}| Time elapsed: {:.3f}| Loss: {:.4f} ".format(percent,elapsed,tr_loss))
        model1.eval()    # Set model to validation mode
        with torch.no_grad():
            losses, nums = zip(
                *[loss_batch_class(model1, xb.to(device), yb.to(device),phb.to(device)) for xb, yb,phb in valid_dl]
            )
        val_loss = np.sum(np.multiply(losses, nums)) / np.sum(nums)
        scheduler.step(val_loss)
        if val_loss< val_tmp:
  
          PATH='/loss_0001_01_gauss_m2.pt'
          torch.save(model1.state_dict(), PATH)
          val_tmp=val_loss
          print(val_loss)
        if epoch%5==0:
             eval_test(model1)

Prediction and Inference

In [None]:
from epftoolbox.evaluation import MASE, sMAPE, MAPE,MAE
def rmse(predictions, targets):
    return np.sqrt(((predictions - targets) ** 2).mean())

def smape(a, f):
    return 1/len(a) * np.sum(2 * np.abs(f-a) / (np.abs(a) + np.abs(f))*100)
def my_mode(np_array, return_counts=False):
  uniques, counts = np.unique(np_array, return_counts=True)
  # print(uniques, counts)
  
  cnts = [0,0,0,0]
  for i in range(uniques.shape[0]):
    cnts[i] = counts[i]
  
  counts = np.array(cnts)
  mode_indx = np.argmax(counts)


  if not return_counts:
    return uniques[mode_indx]
  return uniques[mode_indx], cnts


def normalize_z(x, avg, std_dev, smooth = 0.0001):
  return (x-avg) / (std_dev + smooth)

def eval_test(model1):
 from sklearn.metrics import classification_report
 INPUT_DIR = "/test"
 NORMALIZE_DIR = "/content/drive/MyDrive/DLNN-ProjC1-main/output/new_data_sampling"



# some constants

 X_NAMES = ["accel_x", "accel_y", "accel_z", "gyro_x", "gyro_y", "gyro_z"]
 TIME = ["TIME"]
 CLASS = ["CLASS"]
 SEQUENCE_SIZE = 160
 POST_PROCESS_WINDOW = 10
#  model,_=get_model(adam=True)
 mean = np.load(os.path.join(NORMALIZE_DIR, "x_mean.npy"))
 std = np.load(os.path.join(NORMALIZE_DIR, "x_stddev.npy"))
 # loading all models
 MODEL_DIR = "/content/drive/MyDrive/DLNN-ProjC1-main"
 models = (model1.to('cpu')).eval()
 outputs = {}

#  print("number of models: ", len(models))
 NUMBER_MODELS = 1 #len(models)

 for i in range(-1, NUMBER_MODELS):
  outputs[i] = {}  

#  print(models[0])
 # generating preds
# import pdb 
 try_in=torch.zeros(0)
 phase_out=torch.zeros(0)
 z_shape_out=torch.zeros(0)
 try_out=torch.zeros(0)
 for model_number in range(1):
  for test_input_file in sorted(os.listdir(INPUT_DIR)):
    prefix = test_input_file[:15]

    axp=pd.read_csv(os.path.join(INPUT_DIR, test_input_file))
    tt=axp[axp['0']==3]
    print('actual shape:',axp.shape)
    print('filtering the labels:',tt.shape)
    test_input = np.array(tt)


    test_rows = test_input.shape[0]

    test_input_x = test_input[:, :6]
    label= test_input[:, 6]
    test_output_x = test_input[:, 7]

    outputs[-1][test_input_file] = label[::4]

    test_input_x = normalize_z(test_input_x, mean, std)
    

    done_index = 0
    test_input_batch = []
    out_batch=[]
    while done_index + SEQUENCE_SIZE < test_rows:
      test_input_batch.append(test_input_x[done_index: done_index + SEQUENCE_SIZE])
      out_batch.append(test_output_x[done_index: done_index + SEQUENCE_SIZE])
      done_index += SEQUENCE_SIZE
    # pdb.set_trace()
    test_input_batch.append(test_input_x[-SEQUENCE_SIZE:])
    out_batch.append(test_output_x[-SEQUENCE_SIZE:])
    test_input_batch = np.array(test_input_batch)  
    out_batch = np.array(out_batch) 

    ag=out_batch
    num_bins=8
    bb3=np.zeros((ag.shape[0],ag.shape[1],num_bins))
    for j in range(ag.shape[0]):
     for i in range(ag.shape[1]):
       bb3[j,i,:]=guass_fit(encode_angle(ag[j,i],num_bins))

    test_input_batch= np.concatenate((test_input_batch,bb3),axis=2)
    test_input_batch = np.transpose(test_input_batch,(0,2,1))
    test_input_batch1=test_input_batch[:,:,:80]


    with torch.no_grad():
        # pdb.set_trace()
        recon1,phase1,_ = models((torch.Tensor(test_input_batch1).float())) 
        shape1=models.down_samp((torch.Tensor(test_input_batch1).float()))
        try_out=torch.cat([try_out,recon1])
        try_in=torch.cat([try_in,torch.from_numpy(test_input_batch).float()])
        phase_out=torch.cat([phase_out,phase1])
        z_shape_out=torch.cat([z_shape_out,shape1])

 print('RMSE total:',rmse(try_in[:,:,:], try_out[:,:,:]))
 print('RMSE Reconstruction:',rmse(try_in[:,:,:80], try_out[:,:,:80]))
 print('RMSE Forecast:',rmse(try_in[:,:,80:], try_out[:,:,80:]))
 print('sMAPE:',sMAPE(try_in[:,:,80:].numpy(), try_out[:,:,80:].numpy()))
 print('MAE:',MAE(try_in[:,:,80:].numpy(), try_out[:,:,80:].numpy()))
 print('1s:',rmse(try_in[:,:,:120], try_out[:,:,:120]))
 print('2s:',rmse(try_in[:,:,:], try_out[:,:,:]))
 
 return try_in, try_out,phase_out,z_shape_out
device = torch.device('cuda:' + str(0) if torch.cuda.is_available() else 'cpu')
model1=class_model.to(device)
try_in, try_out,phase_out,z_shape_out=eval_test(model1)

Mapping the embedding space to 3D

In [None]:
aa=z_shape_out.reshape(-1,40*64) # (batch, 64,40) encoder
from sklearn.decomposition import PCA
pca = PCA(n_components=3)
y=pca.fit_transform(aa)

from mpl_toolkits import mplot3d

fig = plt.figure(figsize=(10,5))
ax = plt.axes(projection='3d')
wee=np.zeros(len(y))
ax.scatter3D(y[:,0],y[:,1], y[:,2])

Cleaning some the tail in the plot

In [None]:
import sklearn
# from sklearn.datasets import load_boston
import pandas as pd

xxx=pd.DataFrame({'x':y[:,0],'y':y[:,1],'z':y[:,2]})


''' Detection '''
# IQR
Q1 = np.percentile(xxx['x'], 15,
				interpolation = 'midpoint')

Q3 = np.percentile(xxx['x'], 85,
				interpolation = 'midpoint')# 35
IQR = Q3 - Q1

print("Old Shape: ", xxx.shape)

# Upper bound
upper = np.where(xxx['x'] >= (Q3+1.5*IQR))
# Lower bound
lower = np.where(xxx['x'] <= (Q1-1.5*IQR))

''' Removing the Outliers '''
xxx.drop(upper[0], inplace = True)
xxx.drop(lower[0], inplace = True)

print("New Shape: ", xxx.shape)



xn=xxx['x'].values
yn=xxx['y'].values
zn=xxx['z'].values
wwr=np.array([xn,yn,zn]).T
fig = plt.figure(figsize=(10,5))
ax = plt.axes(projection='3d')
wee=np.zeros(len(y))
ax.plot3D(wwr[:,0],wwr[:,1], wwr[:,2],'o')

Fitting a circle to the circular shape area

In [None]:
from numpy import *
from matplotlib.pyplot import *
from mpl_toolkits.mplot3d.axes3d import Axes3D

%matplotlib inline
#-------------------------------------------------------------------------------
# Generate points on circle
# P(t) = r*cos(t)*u + r*sin(t)*(n x u) + C
#-------------------------------------------------------------------------------
def generate_circle_by_vectors(t, C, r, n, u):
    n = n/linalg.norm(n)
    u = u/linalg.norm(u)
    P_circle = r*cos(t)[:,newaxis]*u + r*sin(t)[:,newaxis]*cross(n,u) + C
    return P_circle

def generate_circle_by_angles(t, C, r, theta, phi):
    # Orthonormal vectors n, u, <n,u>=0
    n = array([cos(phi)*sin(theta), sin(phi)*sin(theta), cos(theta)])
    u = array([-sin(phi), cos(phi), 0])
    
    # P(t) = r*cos(t)*u + r*sin(t)*(n x u) + C
    P_circle = r*cos(t)[:,newaxis]*u + r*sin(t)[:,newaxis]*cross(n,u) + C
    return P_circle

#-------------------------------------------------------------------------------
# Generating circle
#-------------------------------------------------------------------------------
r = 2.5               # Radius
C = np.mean(y,axis=0)#array([3,3,4])    # Center
theta = 45/180*pi     # Azimuth
phi   = -30/180*pi    # Zenith

t = linspace(-2*pi, 2*pi, 100)
P_gen = generate_circle_by_angles(t, C, r, theta, phi)

#-------------------------------------------------------------------------------
# Cluster of points
#-------------------------------------------------------------------------------
t = linspace(-pi, -0.25*pi, 100)
n = len(t)
P = generate_circle_by_angles(t, C, r, theta, phi)

# Add some random noise to the points
P += random.normal(size=P.shape) * 0.1
P=wwr# np.array([wee, y[:,1],y[:,2]]).T
#-------------------------------------------------------------------------------
# Plot
#-------------------------------------------------------------------------------
f, ax = subplots(1, 3, figsize=(15,5))
alpha_pts = 0.5
i = 0
ax[i].plot(P_gen[:,0], P_gen[:,1], 'y-', lw=3, label='Generating circle')
ax[i].scatter(P[:,0], P[:,1], alpha=alpha_pts, label='Cluster points P')
ax[i].set_title('View X-Y')
ax[i].set_xlabel('x'); ax[i].set_ylabel('y');
ax[i].set_aspect('equal', 'datalim'); ax[i].margins(.1, .1)
ax[i].grid()
i = 1
ax[i].plot(P_gen[:,0], P_gen[:,2], 'y-', lw=3, label='Generating circle')
ax[i].scatter(P[:,0], P[:,2], alpha=alpha_pts, label='Cluster points P')
ax[i].set_title('View X-Z')
ax[i].set_xlabel('x'); ax[i].set_ylabel('z'); 
ax[i].set_aspect('equal', 'datalim'); ax[i].margins(.1, .1)
ax[i].grid()
i = 2
ax[i].plot(P_gen[:,1], P_gen[:,2], 'y-', lw=3, label='Generating circle')
ax[i].scatter(P[:,1], P[:,2], alpha=alpha_pts, label='Cluster points P')
ax[i].set_title('View Y-Z')
ax[i].set_xlabel('y'); ax[i].set_ylabel('z'); 
ax[i].set_aspect('equal', 'datalim'); ax[i].margins(.1, .1)
ax[i].legend()
ax[i].grid()

#-------------------------------------------------------------------------------
# FIT CIRCLE 2D
# - Find center [xc, yc] and radius r of circle fitting to set of 2D points
# - Optionally specify weights for points
#
# - Implicit circle function:
#   (x-xc)^2 + (y-yc)^2 = r^2
#   (2*xc)*x + (2*yc)*y + (r^2-xc^2-yc^2) = x^2+y^2
#   c[0]*x + c[1]*y + c[2] = x^2+y^2
#
# - Solution by method of least squares:
#   A*c = b, c' = argmin(||A*c - b||^2)
#   A = [x y 1], b = [x^2+y^2]
#-------------------------------------------------------------------------------
def fit_circle_2d(x, y, w=[]):
    
    A = array([x, y, ones(len(x))]).T
    b = x**2 + y**2
    
    # Modify A,b for weighted least squares
    if len(w) == len(x):
        W = diag(w)
        A = dot(W,A)
        b = dot(W,b)
    
    # Solve by method of least squares
    c = linalg.lstsq(A,b,rcond=None)[0]
    
    # Get circle parameters from solution c
    xc = c[0]/2
    yc = c[1]/2
    r = sqrt(c[2] + xc**2 + yc**2)
    return xc, yc, r


#-------------------------------------------------------------------------------
# RODRIGUES ROTATION
# - Rotate given points based on a starting and ending vector
# - Axis k and angle of rotation theta given by vectors n0,n1
#   P_rot = P*cos(theta) + (k x P)*sin(theta) + k*<k,P>*(1-cos(theta))
#-------------------------------------------------------------------------------
def rodrigues_rot(P, n0, n1):
    
    # If P is only 1d array (coords of single point), fix it to be matrix
    if P.ndim == 1:
        P = P[newaxis,:]
    
    # Get vector of rotation k and angle theta
    n0 = n0/linalg.norm(n0)
    n1 = n1/linalg.norm(n1)
    k = cross(n0,n1)
    k = k/linalg.norm(k)
    theta = arccos(dot(n0,n1))
    
    # Compute rotated points
    P_rot = zeros((len(P),3))
    for i in range(len(P)):
        P_rot[i] = P[i]*cos(theta) + cross(k,P[i])*sin(theta) + k*dot(k,P[i])*(1-cos(theta))

    return P_rot


#-------------------------------------------------------------------------------
# ANGLE BETWEEN
# - Get angle between vectors u,v with sign based on plane with unit normal n
#-------------------------------------------------------------------------------
def angle_between(u, v, n=None):
    if n is None:
        return arctan2(linalg.norm(cross(u,v)), dot(u,v))
    else:
        return arctan2(dot(n,cross(u,v)), dot(u,v))

    
#-------------------------------------------------------------------------------
# - Make axes of 3D plot to have equal scales
# - This is a workaround to Matplotlib's set_aspect('equal') and axis('equal')
#   which were not working for 3D
#-------------------------------------------------------------------------------
def set_axes_equal_3d(ax):
    limits = array([ax.get_xlim3d(), ax.get_ylim3d(), ax.get_zlim3d()])
    spans = abs(limits[:,0] - limits[:,1])
    centers = mean(limits, axis=1)
    radius = 0.5 * max(spans)
    ax.set_xlim3d([centers[0]-radius, centers[0]+radius])
    ax.set_ylim3d([centers[1]-radius, centers[1]+radius])
    ax.set_zlim3d([centers[2]-radius, centers[2]+radius])
%matplotlib inline

#-------------------------------------------------------------------------------
# Init figures
#-------------------------------------------------------------------------------
fig = figure(figsize=(15,11))
alpha_pts = 0.5
figshape = (2,3)
ax = [None]*4
ax[0] = subplot2grid(figshape, loc=(0,0), colspan=2)
ax[1] = subplot2grid(figshape, loc=(1,0))
ax[2] = subplot2grid(figshape, loc=(1,1))
ax[3] = subplot2grid(figshape, loc=(1,2))
i = 0
ax[i].set_title('Fitting circle in 2D coords projected onto fitting plane')
ax[i].set_xlabel('x'); ax[i].set_ylabel('y');
ax[i].set_aspect('equal', 'datalim'); ax[i].margins(.1, .1); ax[i].grid()
i = 1
ax[i].plot(P_gen[:,0], P_gen[:,1], 'y-', lw=3, label='Generating circle')
ax[i].scatter(P[:,0], P[:,1], alpha=alpha_pts, label='Cluster points P')
ax[i].set_title('View X-Y')
ax[i].set_xlabel('x'); ax[i].set_ylabel('y');
ax[i].set_aspect('equal', 'datalim'); ax[i].margins(.1, .1); ax[i].grid()
i = 2
ax[i].plot(P_gen[:,0], P_gen[:,2], 'y-', lw=3, label='Generating circle')
ax[i].scatter(P[:,0], P[:,2], alpha=alpha_pts, label='Cluster points P')
ax[i].set_title('View X-Z')
ax[i].set_xlabel('x'); ax[i].set_ylabel('z'); 
ax[i].set_aspect('equal', 'datalim'); ax[i].margins(.1, .1); ax[i].grid()
i = 3
ax[i].plot(P_gen[:,1], P_gen[:,2], 'y-', lw=3, label='Generating circle')
ax[i].scatter(P[:,1], P[:,2], alpha=alpha_pts, label='Cluster points P')
ax[i].set_title('View Y-Z')
ax[i].set_xlabel('y'); ax[i].set_ylabel('z'); 
ax[i].set_aspect('equal', 'datalim'); ax[i].margins(.1, .1); ax[i].grid()

#-------------------------------------------------------------------------------
# (1) Fitting plane by SVD for the mean-centered data
# Eq. of plane is <p,n> + d = 0, where p is a point on plane and n is normal vector
#-------------------------------------------------------------------------------
P_mean = P.mean(axis=0)
P_centered = P - P_mean
U,s,V = linalg.svd(P_centered)

# Normal vector of fitting plane is given by 3rd column in V
# Note linalg.svd returns V^T, so we need to select 3rd row from V^T
normal = V[2,:]
d = -dot(P_mean, normal)  # d = -<p,n>

#-------------------------------------------------------------------------------
# (2) Project points to coords X-Y in 2D plane
#-------------------------------------------------------------------------------
P_xy = rodrigues_rot(P_centered, normal, [0,0,1])

ax[0].scatter(P_xy[:,0], P_xy[:,1], alpha=alpha_pts, label='Projected points')

#-------------------------------------------------------------------------------
# (3) Fit circle in new 2D coords
#-------------------------------------------------------------------------------
xc, yc, r = fit_circle_2d(P_xy[:,0], P_xy[:,1])

#--- Generate circle points in 2D
t = linspace(0, 2*pi, 300)
xx = xc + r*cos(t)
yy = yc + r*sin(t)

ax[0].plot(xx, yy, 'k--', lw=2, label='Fitting circle')
ax[0].plot(xc, yc, 'k+', ms=10)
ax[0].legend()

#-------------------------------------------------------------------------------
# (4) Transform circle center back to 3D coords
#-------------------------------------------------------------------------------
C = rodrigues_rot(array([xc,yc,0]), [0,0,1], normal) + P_mean
C = C.flatten()

#--- Generate points for fitting circle
t = linspace(0, 2*pi, 100)
u = P[0] - C
P_fitcircle = generate_circle_by_vectors(t, C, r, normal, u)

ax[1].plot(P_fitcircle[:,0], P_fitcircle[:,1], 'k--', lw=2, label='Fitting circle')
ax[2].plot(P_fitcircle[:,0], P_fitcircle[:,2], 'k--', lw=2, label='Fitting circle')
ax[3].plot(P_fitcircle[:,1], P_fitcircle[:,2], 'k--', lw=2, label='Fitting circle')
ax[3].legend()

#--- Generate points for fitting arc
u = P[0] - C
v = P[-1] - C
theta = angle_between(u, v, normal)

t = linspace(0, theta, 100)
P_fitarc = generate_circle_by_vectors(t, C, r, normal, u)

ax[1].plot(P_fitarc[:,0], P_fitarc[:,1], 'k-', lw=3, label='Fitting arc')
ax[2].plot(P_fitarc[:,0], P_fitarc[:,2], 'k-', lw=3, label='Fitting arc')
ax[3].plot(P_fitarc[:,1], P_fitarc[:,2], 'k-', lw=3, label='Fitting arc')
ax[1].plot(C[0], C[1], 'k+', ms=10)
ax[2].plot(C[0], C[2], 'k+', ms=10)
ax[3].plot(C[1], C[2], 'k+', ms=10)
ax[3].legend()

print('Fitting plane: n = %s' % array_str(normal, precision=4))
print('Fitting circle: center = %s, r = %.4g' % (array_str(C, precision=4), r))
print('Fitting arc: u = %s, θ = %.4g' % (array_str(u, precision=4), theta*180/pi))

%matplotlib

fig = figure(figsize=(15,15))
ax = fig.add_subplot(1,1,1,projection='3d')
ax.plot(*P_gen.T, color='y', lw=3, label='Generating circle')
ax.plot(*P.T, ls='', marker='o', alpha=0.5, label='Cluster points P')

#--- Plot fitting plane
xx, yy = meshgrid(linspace(0,6,11), linspace(0,6,11))
zz = (-normal[0]*xx - normal[1]*yy - d) / normal[2]
# ax.plot_surface(xx, yy, zz, rstride=2, cstride=2, color='y' ,alpha=0.2, shade=False)

#--- Plot fitting circle
ax.plot(*P_fitcircle.T, color='k', ls='--', lw=2, label='Fitting circle')
ax.plot(*P_fitarc.T, color='k', ls='-', lw=3, label='Fitting arc')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.legend()

# ax.set_aspect('equal', 'datalim')
set_axes_equal_3d(ax)

fig = plt.figure(figsize=(10,5))
ax = plt.axes(projection='3d')
wee=np.zeros(len(y))
ax.plot3D(y[:,0],y[:,1], y[:,2],'o')
ax.plot3D(P_fitcircle[:,0],P_fitcircle[:,1], P_fitcircle[:,2],'o')
plt.savefig('/content/drive/MyDrive/DLNN-ProjC1-main/image_phase_AE/comb_lam04',dpi=100)

Mapping back to embedding space and recontruct the signal

In [None]:
x_samp=P_fitcircle[::4]
sst=pca.inverse_transform(x_samp)
sst=sst.reshape(-1,64,40)
aaa=model1.decoder(torch.from_numpy(sst).type(torch.FloatTensor)).detach().numpy()
plt.figure(figsize=(10,8))
# plt.subplot(221)

# equivalent but more general
ax1 = plt.subplot(2, 2, 1,projection='3d')
ax1.view_init(30,45)

ax2 = plt.subplot(2,2,2)
ax1.plot3D(x_samp[:,0],x_samp[:,1], x_samp[:,2],'o')
# ax=fig.add_subplot(1, 2, 1, projection='3d')
for i in range(len(x_samp)):
#  ax1.scatter(x_samp[:,0],x_samp[:,1], x_samp[:,2],'o')
 ax1.scatter(x_samp[i,0],x_samp[i,1], x_samp[i,2],'o')
#  ax1.view_init(10,90)
# plt.figure()
 ax2 = plt.subplot(2,2,2)
 ax2.plot(aaa[i,3,:])
 
 plt.savefig('/content/drive/MyDrive/DLNN-ProjC1-main/Updates/images'+str(i),dpi=100)
 ax2.cla()

Peak detection function

In [None]:
from scipy import signal
def cal_def(peaks):
  v=[]
  for i in range(len(peaks)-1):
    v.append(peaks[i+1]-peaks[i])
  return np.array(v)

def pick_right_peaks(peaks):
  a= cal_def(peaks)
  mean=np.mean(a)
  print(mean)
  i=0
  val_peaks=[]
  while i<len(a):
    if a[i]<=.5*mean:
      print('rejection:',a[i])
      i+=1
    else:
      print('accepted:',a[i])
      val_peaks.append(peaks[i]) 
      print(a[i])
      i+=1
  if a[-1]>=.5*mean:
      val_peaks.append(peaks[-1])
  return val_peaks


for kr in range(len(total_fs1)):
 aa=np.load(total_fs1[kr],allow_pickle=True)['X'] # loading raw signals 
 dd1=aa.flatten()
 time=np.linspace(0,len(dd1),len(dd1))
 sos = signal.butter(1, 15, 'hp', fs=1000, output='sos')
 filtered = signal.sosfilt(sos, dd1)


 from scipy.signal import find_peaks,find_peaks_cwt
 import numpy as np
 pp=1000
 peak_time=[]
 for i in range(int(len(dd1)/1000)):

  m=i*1000
  dd=filtered[m:m+pp]
  tt= time[m:m+pp]
  peaks, _ = find_peaks(dd, height=.3 )# setting a threshold
  aaa=cal_def(peaks)
  bb=np.mean(cal_def(peaks))
  result1=[]

  for j in range(len(aaa)):
   if aaa[j]< .5*bb:
     if dd[peaks[j]]>dd[peaks[j+1]]:
        result1.append(j+1)
     else:
      #  print(i)
        result1.append(j)
   else:
    print('pass')

  result=result1

  new=np.delete(peaks, list(np.array(result)), None)
  if len(new)>4:
    peak_time.append(tt[new[0:4]])
  else:
    peak_time.append(tt[new])

#  peak_time.append(tt[new])

 from scipy.io import savemat
# mdic = {"times": peak_time}
# savemat("/content/peak_times.mat", mdic)

# peak_time
 flat_list = [item for sublist in peak_time for item in sublist]
 flat_times=flat_list
 imu_time=time

 phase= np.zeros(len(imu_time))
 for i in range(len(flat_times)-1):
  idx = np.where((imu_time>flat_times[i])&(imu_time<flat_times[i+1]))
  tp = 2*np.pi*(imu_time[idx]-flat_times[i])/(flat_times[i+1]-flat_times[i]);
  phase[idx]=tp

 np.savez_compressed('/content/drive/MyDrive/ECG/filtered_phase/'+total_fs1[kir][47:], X=filtered, phase=phase)