<a href="https://colab.research.google.com/github/Kialakun/RUL-estimation-using-CVAE-and-UKF/blob/main/vae_ukf_predictive_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## MEASUREMENT GENERATOR
Generate our non-linear system measurements for simulating.

In [272]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split

# DEFINE UTIL FUNCTIONS FOR DATA GENERATION

def generate(f):
  """Function for generating system measurements for simulation"""
  results = []
  x = np.arange(0, 784, 1, dtype=float)
  y = 0
  for t in x:
    y = f(t)
    results.append(y)
  return x, results

def add_gaussian_noise(y, m):
  """Function for adding random noise to measurement"""
  return y + np.random.normal(0, m, len(y))

def generate_datasets(f, batch_size):
  """Function for generating datasets"""
  datasets = []
  for i in range(batch_size):
    x, y = generate(f)
    y = add_gaussian_noise(y, 0.05)
    datasets.append(y)
  train = np.array(datasets).astype('float32')
  return train_test_split(train, train, test_size=0.2, random_state=42)


Define a function for displaying results

In [273]:
import plotly.express as px

def display_results(x, y, title, xaxis_title="x", yaxis_title="y"):
  """function for displaying results"""
  fig = px.scatter(x=x, y=y)
  fig.update_layout(title_text=title)
  fig.update_layout(xaxis_title=xaxis_title, yaxis_title=yaxis_title)
  fig.show()

Generate simulation data

In [274]:
# GENERATE SIMULATION DATA

# Point of Failure
pof = 500 # cycles

def phi(t):
 return 10 + max(0, (t-pof)*0.01)

def wear(t):
  return 0.001*t + max(0, (t-pof)*0.01)

def accelerometer(t):
  return wear(t)/phi(t)

# generated simulated accelerometer readings
x, accelerometer_readings = generate(accelerometer)
accelerometer_readings = add_gaussian_noise(accelerometer_readings, 0.05)

# generate simulated wear measurements
x, wear_measurements = generate(wear)
# wear_measurements = add_gaussian_noise(wear_measurements, 0.1)

# generate simulated parameter value
x, parameter_values = generate(phi)

# display the results
display_results(x, wear_measurements, "Simulated Wear Measurements", xaxis_title="Cycles", yaxis_title="Wear m")
display_results(x, accelerometer_readings, "Simulated Accelerometer Readings", xaxis_title="Cycles", yaxis_title="Acceleration m/s^2")

Generate training dataset to be used as our confitional monitoring data. This dataset will be used by the CVAE to learn operating profiles and generate synthetic accelerometer readings.

In [291]:
# Generate Training Dataset

# Generate datasets using the generate_datasets function
x_train, x_test, y_train, y_test = generate_datasets(accelerometer, 1000)

# Reshape the input data to have the shape (number of samples, width, height)
# In this case, it assumes each sample is a 28x28 image
x_train = np.reshape(x_train, (800, 28, 28))
x_test = np.reshape(x_test, (200, 28, 28))

# Concatenate the training and testing datasets along the first axis (axis=0)
# This creates a combined dataset for training purposes
train_dataset = np.concatenate([x_train, x_test], axis=0)

# We divide the training dataset by active range 0.4 in order to normalize.
# This would be the same as feature scaling
train_dataset = np.expand_dims(train_dataset, -1).astype("float32")/0.4


## VARIATIONAL AUTO-ENCODER
This section is where we define our VAE.

In [276]:
# import tensorflow and other dependencies
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

Define the prior latent space distribution and sampling layer

In [277]:

class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

Defining the encoder:

In [278]:
# ENCODER

# Define the dimensionality of the latent space
latent_dim = 2

# Define the input layer for the encoder, assuming grayscale images with shape (28, 28, 1)
encoder_inputs = keras.Input(shape=(28, 28, 1))

# Apply a convolutional layer with 32 filters, a 3x3 kernel, ReLU activation,
# and a stride of 2, preserving spatial dimensions with "same" padding
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)

# Apply another convolutional layer with 64 filters, a 3x3 kernel, ReLU activation,
# and a stride of 2, preserving spatial dimensions with "same" padding
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)

# Flatten the output to a 1D vector
x = layers.Flatten()(x)

# Apply a fully connected dense layer with 16 units and ReLU activation
x = layers.Dense(16, activation="relu")(x)

# Output the mean and log variance of the latent space
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)

# Use the Sampling layer to sample from the latent space based on the mean and log variance
z = Sampling()([z_mean, z_log_var])

# Create the encoder model with the specified inputs and outputs
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")

# Display a summary of the encoder model architecture
encoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_13 (InputLayer)       [(None, 28, 28, 1)]          0         []                            
                                                                                                  
 conv2d_12 (Conv2D)          (None, 14, 14, 32)           320       ['input_13[0][0]']            
                                                                                                  
 conv2d_13 (Conv2D)          (None, 7, 7, 64)             18496     ['conv2d_12[0][0]']           
                                                                                                  
 flatten_6 (Flatten)         (None, 3136)                 0         ['conv2d_13[0][0]']           
                                                                                            

Now we define the decoder

In [371]:
# DECODER

# Define the input layer for the decoder, assuming a latent space of dimensionality latent_dim
latent_inputs = keras.Input(shape=(latent_dim,))

# Apply a fully connected dense layer with ReLU activation, mapping from latent space to 7*7*64 dimensions
x = layers.Dense(7 * 7 * 64, activation="relu")(latent_inputs)

# Reshape the output to a 3D tensor with shape (7, 7, 64)
x = layers.Reshape((7, 7, 64))(x)

# Apply a transposed convolutional layer (deconvolution) with 64 filters, a 3x3 kernel, ReLU activation,
# and a stride of 2, upsampling the spatial dimensions with "same" padding
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)

# Apply another transposed convolutional layer with 32 filters, a 3x3 kernel, ReLU activation,
# and a stride of 2, further upsampling the spatial dimensions with "same" padding
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)

# Output layer: Apply a transposed convolutional layer with 1 filter, a 3x3 kernel, sigmoid activation,
# and "same" padding to reconstruct the original input shape (assuming grayscale images)
decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)

# Create the decoder model with the specified inputs and outputs
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")

# Display a summary of the decoder model architecture
decoder.summary()

Model: "decoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_15 (InputLayer)       [(None, 2)]               0         
                                                                 
 dense_14 (Dense)            (None, 3136)              9408      
                                                                 
 reshape_7 (Reshape)         (None, 7, 7, 64)          0         
                                                                 
 conv2d_transpose_21 (Conv2  (None, 14, 14, 64)        36928     
 DTranspose)                                                     
                                                                 
 conv2d_transpose_22 (Conv2  (None, 28, 28, 32)        18464     
 DTranspose)                                                     
                                                                 
 conv2d_transpose_23 (Conv2  (None, 28, 28, 1)         289 

Create the Variation Auto-Encoder Class so we can instantiate objects and use the CVAE.

In [280]:
# VARIATIONAL AUTOENCODER CLASS

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = tf.reduce_mean(
                tf.reduce_sum(
                    keras.losses.binary_crossentropy(data, reconstruction), axis=(1, 2)
                )
            )
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }


In [None]:
# print dataset sample for visual inspection
for dataset in train_dataset[:10]:
  print(dataset)

In [282]:
# create CVAE instamce
vae = VAE(encoder, decoder)
# apply optimizer algorithm
vae.compile(optimizer=keras.optimizers.Adam())
# traing the CVAE
vae.fit(train_dataset, epochs=30, batch_size=10)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.src.callbacks.History at 0x787352126b30>

In [283]:
# GENERATIVE MODEL
#
# Generate synthetic data by sampling gaussian distribution
#
# samples
x = np.array([[1, 10]])
# pass the sammples into our decoder for generating sythetic data
synthetic_data = vae.decoder.predict(x)
# loop through datasets in sythetic_data and display graphs
for dataset in synthetic_data:
  # Apply Data Post Processing - flatten
  dataset = np.reshape(dataset, (784,))
  # display the graph
  display_results(np.arange(0, 784, 1), dataset, "Synthetic Dataset Operational Profile", xaxis="cycles", yaxis="Magnitude Factor")

# display the graph og the original dataset
display_results(np.arange(0, 784, 1), np.reshape(train_dataset[8], (784,)), "Training Dataset", xaxis="cycles", yaxis="Magnitude Factor")



## UNSCENTED KALMAN FILTER (UKF)
We will be using the filterpy python package for creating our UKF. Lets first install and import the classes and functions we will be using.

In [284]:
# The author would like to acknowledge the use of the FilterPy library for
# implementing Kalman filtering and other estimation techniques in this work
#
# Labbe, Roger. “Kalman and Bayesian Filters in Python”.
# github repo: https://github.com/rlabbe/Kalman-and-Bayesian-Filters-in-Python

!pip install filterpy

import math
from filterpy.kalman import UnscentedKalmanFilter, MerweScaledSigmaPoints
from filterpy.common import Q_discrete_white_noise



Define our sigma points function and sampling interval ("dt") from which our measurements will be taken.

In [302]:
t = 1.
dimx = 2
dimz = 1
sigma_points = MerweScaledSigmaPoints(2, alpha=.1, beta=2., kappa=-1)

Now let us define our system model (transfer function) and measurement transfer function.

In [340]:
def fx(x, dt):
    """State Transition function"""
    # Define a 2x2 state transition matrix F
    F = np.array([[1, dt],
                  [0, 1]], dtype=float)
    # Perform matrix multiplication to update the state
    return np.dot(F, x)

def hx(x, dt):
    """Measurement transfer function"""
    # Return the first element of the state vector as the measurement
    return np.array([x[0]])

def extrapolate(state, prediction, b):
    """Extrapolate the state based on the prediction and a cutoff index b"""
    results = []
    q = state[0]
    k = state[1]
    for i in range(len(prediction)):
        # Extrapolate using a linear model after the cutoff index b
        if i > b:
            q = q + k * prediction[i]
            results.append(q)
    return results


# Generate synthetic wear measurements and add Gaussian noise
z = add_gaussian_noise(wear_measurements, 0.1)

# Display the noisy wear measurements using a function named display_results
display_results(np.arange(0, 784, 1), z, "Wear Measurements")

# Set the clean wear measurements
clean = wear_measurements

# Reshape the first synthetic data sample and scale it for use as an operating
# profile, and denormalize the dataset by multiplying by active range 0.4
operating_profile = np.reshape(synthetic_data[0], (784,)) * 0.4

Initialize our UKF

In [350]:
ukf = UnscentedKalmanFilter(dim_x=dimx, dim_z=dimz, dt=t, fx=fx, hx=hx, points=sigma_points)
ukf.P *= 0.01
ukf.R = 0.1
ukf.Q = Q_discrete_white_noise(dim=dimx, dt=t, var=(.0001**2))
k = 1 # x standard deviation

Run Unscented Kalman Filter on generated operating profiles.

# 50 Cycles
We shall generate data run 50 cycles with the UKF parameter estimator, then extrapolate the rest until we reach the failure threshold.

In [360]:
import plotly.graph_objects as go


# Set the number of cycles
cycles = 50

# Initialize lists to store state and parameter values
state = []
parameter = []

# Initialize the Unscented Kalman Filter (UKF) state vector
ukf.x = np.array([z[0], 0.07])

# Iterate through the measurements
for i in range(len(z)):

    # Check if we have reached the specified number of cycles
    if i > cycles:
        # Once we reach the specified number of cycles, only append parameter values
        parameter.append(ukf.x[1])
        continue

    # Get the current measurement and accelerometer reading
    measurement = z[i]
    alpha = accelerometer_readings[i]

    # Prediction step of the UKF
    ukf.predict(dt=alpha * t)

    # Update step of the UKF
    ukf.update(measurement, dt=t)

    # Append the current state and parameter values
    state.append(ukf.x[0])
    parameter.append(ukf.x[1])

# Create a Plotly figure
fig = go.Figure()

# Plot the noisy measurements
fig.add_trace(go.Scatter(x=x_axis, y=z,
                         mode='markers',
                         marker=go.scatter.Marker(color="grey"),
                         name='Noisy Measurements'))

# Plot the actual (clean) values
fig.add_trace(go.Scatter(x=x_axis, y=clean,
                         mode='lines',
                         line=go.scatter.Line(width=4, color='orange'),
                         name='Actual'))

# Plot the UKF estimate along with extrapolation
fig.add_trace(go.Scatter(x=x_axis, y=state + extrapolate(ukf.x, operating_profile, cycles),
                         mode='lines',
                         line=go.scatter.Line(width=4, color='green'),
                         name='UKF Estimate'))

# Show the plot
fig.show()

##200 cycles
We shall generate data run 200 cycles with the UKF parameter estimator, then extrapolate the rest until we reach the failure threshold.

In [370]:
cycles = 200

state = []
parameter = []

ukf.x = np.array([z[0], 0.07])

for i in range(len(z)):
  if i > cycles:
    parameter.append(ukf.x[1])
    continue
  measurement = z[i]
  alpha = accelerometer_readings[i]
  ukf.predict(dt=alpha*t)
  ukf.update(measurement, dt=t)
  state.append(ukf.x[0])
  parameter.append(ukf.x[1])

# plot lines
x_axis = np.arange(0, 784, 1)
fig = go.Figure()
fig.add_trace(go.Scatter(x=x_axis, y=z,
                    mode='markers',
                    marker=go.scatter.Marker(color="grey"),
                    name='Noisy Measurements'))
fig.add_trace(go.Scatter(x=x_axis, y=clean,
                    mode='lines',
                    line=go.scatter.Line(width=5, color='orange'),
                    name='Actual'))
fig.add_trace(go.Scatter(x=x_axis, y=state+extrapolate(ukf.x, operating_profile, cycles),
                    mode='lines',
                    line=go.scatter.Line(width=5, color='green'),
                    name='UKF Estimate'))

##600 Cycles
We shall generate data run 600 cycles with the UKF parameter estimator, then extrapolate the rest until we reach the failure threshold.

In [367]:
cycles = 600

state = []

ukf.x = np.array([z[0], 0.09])

for i in range(len(z)):
  if i > cycles:
    break
  measurement = z[i]
  alpha = accelerometer_readings[i]
  ukf.predict(dt=alpha*t)
  ukf.update(measurement, dt=t)
  state.append(ukf.x[0])

# plot lines
x_axis = np.arange(0, 784, 1)
fig = go.Figure()
fig.add_trace(go.Scatter(x=x_axis, y=z,
                    mode='markers',
                    marker=go.scatter.Marker(color="grey"),
                    name='Noisy Measurements'))
fig.add_trace(go.Scatter(x=x_axis, y=clean,
                    mode='lines',
                    line=go.scatter.Line(width=5, color='orange'),
                    name='Actual'))
fig.add_trace(go.Scatter(x=x_axis, y=state+extrapolate(ukf.x, prediction, cycles),
                    mode='lines',
                    line=go.scatter.Line(width=5, color='green'),
                    name='UKF Estimate'))
fig.show()