In [1]:
import sklearn
import tensorflow as tf
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model
from google.colab import drive # Google Colab
import random
import numpy as np
from scipy.spatial.transform import Rotation as R
import os
from copy import deepcopy
import pandas as pd
from math import ceil, floor
from functools import reduce


In [None]:
# Add link to your drive repository
linkDrive = "To Modify1" # Default link '/content/drive'
if linkDrive == "To Modify":
  raise ValueError("Modify link to your drive repository")
else:
  drive.mount(linkDrive)

In [None]:
# Add link to your data repository in your drive
folderPath = "To Modify" # Default link '/content/drive/your/path/to/your/data'
if folderPath == "To Modify":
  raise ValueError("Modify link to your data repository")

## Data Rotation

In [3]:
# Check Splitting 
# If gap between 2 split point is too small return false
def checkDataSplitIsCorrect(splitPoints, lengthData, maxRotationDurationTs):
  for i, point in enumerate(splitPoints[:-1]):
    if splitPoints[i+1]-point <= maxRotationDurationTs + 1:
      return False
  if lengthData-1-splitPoints[-1] <= maxRotationDurationTs + 1:
      return False
  return True


In [4]:
# Methode to create roation in data
# data : raw data extract from txt file
# minRot : minimum number of rotations done
# maxRot : minimum number of rotations done
# minRotationDurationTs : minimum duration of rotations (in Ts)
# maxRotationDurationTs : maximum duration of rotations (in Ts)
# Return :
# dicRotation : all rotation information
# dataRotate : data with rotation process
# splitPoints : points where rotation done 
def createRotationInData (data, minRot = 10, maxRot = 11, minRotationDurationTs = 3, maxRotationDurationTs = 10) :
  firstPass = True
  dicRotation = {} # Stack rotation
  dataRotate = [] # Stack data rotate
  lengthData = data.shape[0] # Len data  
  bufferRotation = R.from_rotvec(np.array([0, 0, 0]), degrees=True) # Rotation from the initial position 
  indexSplitSection = 0

  # Split data 
  #Check if split data is correct
  while firstPass or not checkDataSplitIsCorrect(splitPoints, lengthData, maxRotationDurationTs) :
    firstPass = False
    rotationCounter = random.randint(minRot, maxRot) # Number of rotation period n
    splitPoints = np.random.choice(lengthData - 2, rotationCounter- 1, replace=False) + 1 # Random choice n-1 split point 
    splitPoints.sort() # Sort splitPoints
    splitSections = np.split(data, splitPoints) # Split data in n sections
    splitPoints = np.insert(splitPoints,0,0) # Add 0 in splitPoints 

  for splitSection, splitPoint in zip(splitSections,splitPoints):
    indexSplitSection += 1 
    cumulatingRotation = [] # Buffer to stock cumulative rotation 
    firstElements = [] # Buffer to stock first elements

    # Compute rotation
    rotationTimeStampDuration = random.randint(minRotationDurationTs,maxRotationDurationTs) # Random number nR of rotation duration in [|3,10|]
    rots = R.random(rotationTimeStampDuration) # Calculate nR random rotations
    rotSplitSection = bufferRotation.apply(splitSection[:,1:]) #Apply precedent rotation
    for rot in rots: #Apply nR rotation in steps
      #  Example Rotation apply for nR = 3 : [0, r1, r2*r1, r3*r2*r1, r3*r2,r1, ...]
      firstElements.append(rotSplitSection[0]) # Save first element
      cumulatingRotation.append(bufferRotation) # Save cumulative rotation for this Ts
      rotSplitSection = rot.apply(rotSplitSection[1:,:]) #Apply rotation 
      bufferRotation = rot*bufferRotation # Save total rotation (the order is important)

    cumulatingRotation.append(bufferRotation) # Add las rotate in buffer
    rotSplitSection = np.concatenate((firstElements, rotSplitSection)) # Add first element remove during rotation
    rotSplitSectionWithTs = [ np.concatenate((e1, e2))for e1,e2 in zip(np.reshape(splitSection[:,0],(-1,1)),rotSplitSection)] # Add Ts in rotate Coord
    dicRotation[splitPoint] = cumulatingRotation 
    dataRotate.append(rotSplitSectionWithTs)
  return dicRotation, np.array(dataRotate,dtype=object), splitPoints

In [5]:
# Methode to create dataframe from raw data and rotate data
# data : raw data extract from txt file
# finename : filename
# dataRotate : data with rotation
# dicRotation : all rotation information in data
# Return :
# pandas dataframe with all information necessary for preprocessing
def createDataFrameFromFile(data, filename, dicRotation, dataRotate):
  lengthData = data.shape[0] # Len data  
  indexFileList = []
  indexFile = int(filename.split("_")[1].split(".")[0])
  dataRotateTs =  []
  dataRotateX = []
  dataRotateY = []
  dataRotateZ = []
  dataX = []
  dataY = []
  dataZ = []
  for section in dataRotate:
    for track in section : 
      indexFileList.append(filename)
      dataRotateTs.append(track[0])
      dataRotateX.append(track[1])
      dataRotateY.append(track[2])
      dataRotateZ.append(track[3])
  
  indexRot = 0
  keyDictRotation = list(dicRotation.keys())
  keyDictRotation.append(lengthData)
  rotationTab = []
  for index, splitPoint in enumerate(keyDictRotation[:-1]):
    for el in dicRotation[splitPoint]:
      rotationTab.append(el)
      indexRot+=1
    while indexRot<keyDictRotation[index+1]:
      rotationTab.append(rotationTab[-1])
      indexRot+=1
  for trackData in data : 
      dataX.append(trackData[1])
      dataY.append(trackData[2])
      dataZ.append(trackData[3])
  d = {"File" : indexFile, "Ts":dataRotateTs, "X" : dataRotateX, "Y" : dataRotateY, "Z" : dataRotateZ, "Rotation" : rotationTab, "XTarget" : dataX, "YTarget" : dataY, "ZTarget" : dataZ}
  return pd.DataFrame(data = d)

In [None]:
# Get all data from all file
d = {"File" : [1], "Ts":[1], "X" : [1], "Y" : [1], "Z" : [1], "Rotation" : [1]}
df = pd.DataFrame(data = d)
for path, dirs, files in os.walk(folderPath):
    for filename in files:
      print("File in process :",filename)
      data = np.loadtxt(folderPath + filename) # Get data from file
      dicRotation, dataRotate, splitPoints = createRotationInData(data) # Rotate the data
      dfTemp = createDataFrameFromFile(data, filename, dicRotation, dataRotate) # Create dataframe from raw data and rotate data
      df = df.append(dfTemp)

df = df.reset_index()
df = df.drop(0)
df = df.reset_index()
df = df.drop(columns=["index","level_0"])
dfTemp = None # Free variable
# df.to_csv("dataTracking.csv")  # Uncomment to get data in CSV

In [7]:
# Check if rotation script is ok
# Useful only if you want to check that everything has been done correctly
def checkRotation(dicRotation, dataRotate, data, indexSection = 0, limitPrint = 200, splitPoints = splitPoints) :
  #Complete dic rotation
  indexAVerif = splitPoints[indexSection]
  print("Nombre rotation = ", len(dicRotation[indexAVerif]))
  print("Nombre data a rotate = ", len(dataRotate[indexSection]))
  rotationTab = deepcopy(dicRotation[indexAVerif])
  while len(rotationTab) < len(dataRotate[indexSection]):
    rotationTab.append(dicRotation[indexAVerif][-1])
  index = 0 + indexAVerif
  for dataRotateCoord, rotation in zip(dataRotate[indexSection][0:limitPrint], rotationTab[0:limitPrint]):
    rot = rotation.inv()
    print(data[index], "------>", [float(int(e)) for e in rot.apply(dataRotateCoord[1:])])
    index +=1

#checkRotation(dicRotation, dataRotate, data, indexSection = 2) Uncomment to check Rotation Script 

## Analyse & preprocessing

In [8]:
# Preprocess Data
# df : dataframe create before (column same as df create by createDataFrameFromFile function)
# Return:
# df : pandas dataframe with only necessary columns and data normalise
# normalizeInfos : information to recover original data  
def preprocessData(df):
  X = []
  Y = []
  files = df["File"].unique() # Get list Files

  # Get min, max for each coord in data
  minX, minY, minZ = min(floor(df["X"].min()),floor(df["XTarget"].min())), min(floor(df["Y"].min()),floor(df["YTarget"].min())), min(floor(df["Z"].min()),floor(df["ZTarget"].min()))
  maxX, maxY, maxZ = max(ceil(df["X"].max()), ceil(df["XTarget"].max())), max(ceil(df["Y"].max()),ceil(df["YTarget"].max())), max(ceil(df["Z"].max()),ceil(df["ZTarget"].max()))
  maxAbsX, maxAbsY, maxAbsz = max(abs(minX),maxX),  max(abs(minY),maxY),  max(abs(minZ),maxZ),

  # Normalize 
  df["X"], df["Y"], df["Z"] = df["X"]/ maxAbsX, df["Y"]/ maxAbsY, df["Z"]/ maxAbsz
  df["XTarget"], df["YTarget"], df["ZTarget"] = df["XTarget"]/ maxAbsX, df["YTarget"]/ maxAbsY, df["ZTarget"]/ maxAbsz

  # Add a column with the original file of the data
  for file in files:
    dfFile = df[df["File"] == file]
  normalizeInfos = pd.DataFrame(data = {"maxAbsX" : [maxAbsX], "maxAbsY" : [maxAbsY], "maxAbsz" : [maxAbsz]})

  # Group x,y,z coordinates in array
  dataInput=[]
  for x,y,z in zip(df["X"], df["Y"], df["Z"]):
    dataInput.append([x, y,z])
  df["input"] = dataInput

  dataOutput=[]
  for x,y,z in zip(df["XTarget"], df["YTarget"], df["ZTarget"]):
    dataOutput.append([x, y,z])
  df["output"] = dataOutput
  df.drop(columns=["XTarget", "YTarget", "ZTarget", "X", "Y", "Z", "Rotation"], inplace=True)
  return df, normalizeInfos

#Checkpoint copy, comment to release cache
dfTest = deepcopy(df)
dfNorm, normalizeInfos = preprocessData(dfTest)

In [9]:
# Split data in test-train
fileTrain = dfNorm["File"].unique()[:-2]
fileTest = dfNorm["File"].unique()[-2:]
dfTest = dfNorm[np.logical_or.reduce([dfNorm["File"]==fileTest[0],dfNorm["File"]==fileTest[1]])]  
dfTrain = dfNorm[np.logical_not(np.logical_or.reduce([dfNorm["File"]==fileTest[0],dfNorm["File"]==fileTest[1]]))]

In [10]:
# Sequence objet to avoid storing everything in cache
class Sequence(tf.keras.utils.Sequence):

    def __init__(self, df, sequenceLength):
        # Initialization
        self.sequenceLength = sequenceLength
        self.x = np.array(df["input"].tolist())
        self.y = np.array(df["output"].tolist())
        self.datalen = len(self.y)
        self.indexes = np.arange(self.datalen)

    def __getitem__(self, index):
        # get batch indexes from shuffled indexes
        batch_indexes = self.indexes[index*self.sequenceLength:(index+1)*self.sequenceLength]
        while len(dfNorm.iloc[batch_indexes]["File"].unique())!=1: # Check all indexes are from the same file 
          index+=1
          batch_indexes = self.indexes[index*self.sequenceLength:(index+1)*self.sequenceLength]
        x_batch = np.reshape(self.x[batch_indexes], (1, sequenceLength, 3))
        y_batch = np.reshape(self.y[batch_indexes], (1, sequenceLength, 3))
        return x_batch, y_batch
    
    def __len__(self):
        # Denotes the number of batches per epoch
        return self.datalen // self.sequenceLength

    def on_epoch_end(self):
        # Updates indexes after each epoch
        self.indexes = np.arange(self.datalen)

# Create train and test sequence for training
sequenceLength = 20
train = Sequence(df = dfTrain, sequenceLength = sequenceLength)
test = Sequence(df = dfTest, sequenceLength = sequenceLength)

## Model Creation- Training and Prediction


In [16]:
class AutoencoderCNN1D(Model):

  def __init__(self, latent_dim, n_filters_latent, filter_size, sequenceLength, stridesTab):
    super(AutoencoderCNN1D, self).__init__()
    self.n_filters_latent = n_filters_latent   
    self.filter_size = filter_size  
    maxFilter = n_filters_latent * 2 ** (depth-1) 
    lengthSeqBeforeLatent = int(sequenceLength / int(reduce(lambda x,y: x*y, stridesTab)))
    self.encoder = tf.keras.Sequential([
      layers.Input(shape=(sequenceLength, 3)),
      layers.Conv1D(filters = n_filters_latent* 2 ** (depth-2) , kernel_size = filter_size, padding="same", strides=stridesTab[0], activation="relu"),
      layers.Conv1D(filters = n_filters_latent * 2 ** (depth-1), kernel_size = filter_size , padding="same", strides=stridesTab[1], activation="relu"),
      layers.Flatten(),
      layers.Dense(latent_dim),
    ])
    self.decoder = tf.keras.Sequential([
      layers.InputLayer(input_shape=(latent_dim,)),
      layers.Dense(units=lengthSeqBeforeLatent*maxFilter, activation="relu"),
      layers.Reshape(target_shape=(lengthSeqBeforeLatent, maxFilter)),
      layers.Conv1DTranspose(filters = n_filters_latent * 2 ** (depth-1), kernel_size= filter_size, padding="same", strides=2, activation="relu" ),
      layers.Conv1DTranspose(filters = n_filters_latent * 2 ** (depth-2), kernel_size = filter_size, padding="same", strides=2, activation="relu"),
      layers.Conv1DTranspose(filters= 3, kernel_size=1, padding="same", activation="sigmoid"),
    ])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

latent_dim = 8 
n_filters_latent = 16
filter_size = 3
stridesTab = [2,2]
depth = 2
# Model Creation
autoencoder = AutoencoderCNN1D(latent_dim, n_filters_latent, filter_size, sequenceLength, stridesTab)

In [None]:
# Compilation
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError(), metrics = ["mean_squared_error"])

# Early Stopping to avoid overfitting
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                                    patience=5,
                                                    mode='min',
                                                    min_delta=0.0001)
# Training
autoencoder.fit(train, validation_data = test, callbacks = early_stopping,  verbose=1, batch_size = 64, epochs = 10)

In [18]:
prediction = autoencoder.predict(test)