In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import models

In [None]:
# Print versions
!python --version
print('Numpy ' + np.__version__)
print('TensorFlow ' + tf.__version__)
print('Keras ' + tf.keras.__version__)

In [None]:
# Settings
models_path = 'models'  # Where we can find the model files (relative path location)
keras_model_name = 'fan_low_model'           # Will be given .h5 suffix
tflite_model_name = 'fan_low_model'          # Will be given .tflite suffix
c_model_name = 'fan_low_model'               # Will be given .h suffix


# raw_scale = 100             # Multiply raw values to fit into integers
# sensor_sample_rate = 200    # Hz
# desired_sample_rate = 50    # Hz
# sample_time = 0.64          # Time (sec) length of each sample
# samples_per_file = 128      # Expected number of measurements in each file (truncate to this)

# max_measurements = int(sample_time * sensor_sample_rate)
# downsample_factor = int(samples_per_file / desired_sample_rate)
# win_len = int(max_measurements / downsample_factor)

# keras_model_name = 'fan_low_model'           # Will be given .h5 suffix
# tflite_model_name = 'fan_low_model'          # Will be given .tflite suffix
# c_model_name = 'fan_low_model'               # Will be given .h suffix
# c_hann_name = 'hann_window'                  # Will be given .h suffix
# sample_file_name = 'normal_anomaly_samples'  # Will be given .npz suffix
# rep_dataset_name = 'normal_anomaly_test_set' # Will be given .npz suffix

# print('Max measurements per file:', max_measurements)
# print('Downsample factor:', downsample_factor)
# print('Window length:', win_len)

In [None]:
# Load model
model = models.load_model(keras_model_name + '.h5')

In [None]:
# Load test set
npzfile = np.load(rep_dataset_name + '.npz')
x_test = npzfile['x_test']

In [None]:
# Generator function that provides representative data samples for quantization
def representative_dataset_gen():
    for sample in x_test:
        sample = np.expand_dims(sample.astype(np.float32), axis=0)
        yield [sample]

In [None]:
# Convert Keras model to a tflite model
converter = tf.lite.TFLiteConverter.from_keras_model(model)
#converter.optimizations = [tf.lite.Optimize.OPTIMIZE_FOR_SIZE]
#converter.optimizations = [tf.lite.Optimize.DEFAULT]

# Quantization settings
#converter.representative_dataset = representative_dataset_gen

# Convert and save
tflite_model = converter.convert()
open(tflite_model_name + '.tflite', 'wb').write(tflite_model)

In [None]:
# Function: Convert some values into an array for C programming
def create_c_lookup_table(array, var_type, var_name, line_limit=80):

    c_str = ''

    # Create header guard
    c_str += '#ifndef ' + var_name.upper() + '_H\n'
    c_str += '#define ' + var_name.upper() + '_H\n\n'

    # Add array length at top of file
    c_str += 'const unsigned int ' + var_name + '_len = ' + str(len(array)) + ';\n'

    # Declare C variable
    c_str += 'const ' + var_type + ' ' + var_name + '[] = {\n'
    
    # Create string for the array
    indent = '  '
    array_str = indent
    line_len = len(indent)
    val_sep = ', '
    for i, val in enumerate(array):

        # Create a new line if string is over line limit
        val_str = str(val)
        if line_len + len(val_str) + len(val_sep) > line_limit:
            array_str += '\n' + indent
            line_len = len(indent)

        # Add value and separator
        array_str += val_str
        line_len += len(val_str)
        if (i + 1) < len(array):
            array_str += val_sep
            line_len += len(val_sep)

    # Add closing brace
    c_str += array_str + '\n};\n\n'

    # Close out header guard
    c_str += '#endif //' + var_name.upper() + '_H'

    return c_str

In [None]:
# Write TFLite model to a C source (or header) file
with open(c_model_name + '.h', 'w') as file:
    hex_array = [format(val, '#04x') for val in tflite_model]
    file.write(create_c_lookup_table(hex_array, 'unsigned char', c_model_name))

In [None]:
# Create a hanning window lookup table
with open(c_hann_name + '.h', 'w') as file:
    file.write(create_c_lookup_table(np.hanning(win_len), 'float', c_hann_name))

In [None]:
# Load samples for testing
npzfile = np.load(sample_file_name + '.npz')
normal_sample = npzfile['normal_sample']
anomaly_sample = npzfile['anomaly_sample']

In [None]:
# Print out normal sample, truncated to 128 measurements
print("X")
for i in normal_sample[0:128, 0]:
    print(str(i), end=', ')
print()
print("Y")
for i in normal_sample[0:128, 1]:
    print(str(i), end=', ')
print()
print("Z")
for i in normal_sample[0:128, 2]:
    print(str(i), end=', ')

In [None]:
# Print out anomaly sample, truncated to 128 measurements
print("X")
for i in anomaly_sample[0:128, 0]:
    print(str(i), end=', ')
print()
print("Y")
for i in anomaly_sample[0:128, 1]:
    print(str(i), end=', ')
print()
print("Z")
for i in anomaly_sample[0:128, 2]:
    print(str(i), end=', ')

In [None]:
# Function: extract specified features (variances, MAD) from sample
def extract_features(sample, max_measurements=0, scale=1):
    
    features = []
    
    # Truncate sample
    if max_measurements == 0:
        max_measurements = sample.shape[0]
    sample = sample[0:max_measurements]
    
    # Scale sample
    sample = scale * sample
    
    # Compute a windowed FFT of each axis in the sample (leave off DC)
    sample = sample[::downsample_factor, :]  # Downsample
    sample = np.floor(sample)                # Round down to int
    hann_window = np.hanning(sample.shape[0])
    for i, axis in enumerate(sample.T):
        fft = abs(np.fft.rfft(axis * hann_window))
        features.append(fft[1:])  # Leave off DC
    
    return np.floor(np.array(features).flatten())

In [None]:
# Compute windowed FFT of each axis in normal sample
normal_fft = extract_features(normal_sample, max_measurements, raw_scale)
print(normal_fft)
plt.plot(normal_fft)

In [None]:
# Compute windowed FFT of each axis in anomaly sample
anomaly_fft = extract_features(anomaly_sample, max_measurements, raw_scale)
print(anomaly_fft)
plt.plot(anomaly_fft)

In [None]:
# Copy in Arduino FFTs for normal sample
arduino_fft_normal_x = [11, 1, 0, 2, 0, 0, 0, 1, 0, 0, 1, 1, 3, 1, 0, 0, 1, 0, 1, 0, 1, 0, 0, 2, 1, 1, 0, 1, 1, 2, 3, 0]
arduino_fft_normal_y = [330, 3, 4, 13, 10, 1, 0, 1, 0, 0, 4, 24, 32, 8, 2, 3, 2, 1, 2, 0, 3, 1, 0, 2, 0, 1, 1, 1, 2, 5, 2, 2]
arduino_fft_normal_z = [2493, 30, 9, 5, 0, 4, 3, 4, 2, 3, 7, 29, 42, 19, 0, 0, 1, 1, 0, 0, 5, 6, 1, 2, 2, 1, 2, 3, 3, 0, 1, 4]
arduino_fft_anomaly_x = [54, 2, 1, 1, 0, 1, 0, 0, 1, 0, 1, 3, 3, 0, 0, 0, 1, 0, 0, 1, 1, 2, 1, 0, 0, 1, 2, 2, 1, 0, 4, 3]
arduino_fft_anomaly_y = [325, 1, 4, 1, 1, 3, 1, 1, 1, 1, 0, 7, 10, 4, 0, 0, 1, 0, 0, 0, 0, 4, 2, 0, 0, 0, 1, 1, 0, 1, 0, 0]
arduino_fft_anomaly_z = [2490, 22, 15, 3, 10, 3, 0, 3, 3, 4, 1, 17, 22, 6, 1, 0, 2, 2, 1, 1, 4, 5, 2, 1, 1, 3, 1, 1, 1, 0, 2, 1]

In [None]:
# Create feature set from Arduino FFTs
arduino_fft_normal = np.array([arduino_fft_normal_x, arduino_fft_normal_y, arduino_fft_normal_z]).flatten()
arduino_fft_anomaly = np.array([arduino_fft_anomaly_x, arduino_fft_anomaly_y, arduino_fft_anomaly_z]).flatten()
print(arduino_fft_normal.shape)
plt.plot(arduino_fft_normal)
plt.figure()
plt.plot(arduino_fft_anomaly)

In [None]:
# Test arduino FFT normal in full model NN
input_tensor = arduino_fft_normal.reshape(1, -1)
print(input_tensor)
predictions = model.predict(input_tensor)
print(predictions)
plt.plot(predictions[0])
mse = np.mean(np.power(input_tensor - predictions, 2), axis=1)
print(mse)

In [None]:
# Test arduino FFT anomaly in full model NN
input_tensor = arduino_fft_anomaly.reshape(1, -1)
print(input_tensor)
predictions = model.predict(input_tensor)
print(predictions)
plt.plot(predictions[0])
mse = np.mean(np.power(input_tensor - predictions, 2), axis=1)
print(np.power(input_tensor - predictions, 2))
print(mse)

In [None]:
# Arduino NN output for normal sample
arduino_feature_set = [11.00, 1.00, 0.00, 2.00, 0.00, 0.00, 0.00, 1.00, 0.00, 0.00, 1.00, 1.00, 3.00, 1.00, 0.00, 0.00, 1.00, 0.00, 1.00, 0.00, 1.00, 0.00, 0.00, 2.00, 1.00, 1.00, 0.00, 1.00, 1.00, 2.00, 3.00, 0.00, 330.00, 3.00, 4.00, 13.00, 10.00, 1.00, 0.00, 1.00, 0.00, 0.00, 4.00, 24.00, 32.00, 8.00, 2.00, 3.00, 2.00, 1.00, 2.00, 0.00, 3.00, 1.00, 0.00, 2.00, 0.00, 1.00, 1.00, 1.00, 2.00, 5.00, 2.00, 2.00, 2493.00, 30.00, 9.00, 5.00, 0.00, 4.00, 3.00, 4.00, 2.00, 3.00, 7.00, 29.00, 42.00, 19.00, 0.00, 0.00, 1.00, 1.00, 0.00, 0.00, 5.00, 6.00, 1.00, 2.00, 2.00, 1.00, 2.00, 3.00, 3.00, 0.00, 1.00, 4.00]
arduino_predictions = [-0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, -0.00, 1.00, 1.00, 0.00, 1.00, 1.00, 2.00, 3.00, 0.00, 330.00, 3.00, 4.00, 13.00, 10.00, 1.00, 0.00, 1.00, 0.00, 0.00, 4.00, 24.00, 32.00, 8.00, 2.00, 3.00, 2.00, 1.00, 2.00, 0.00, 3.00, 1.00, 0.00, 2.00, 0.00, 1.00, 1.00, 1.00, 2.00, 5.00, 2.00, 2.00, 2493.00, 30.00, 9.00, 5.00, 0.00, 4.00, 3.00, 4.00, 2.00, 3.00, 7.00, 29.00, 42.00, 19.00, 0.00, 0.00, 1.00, 1.00, 0.00, 0.00, 5.00, 6.00, 1.00, 2.00, 2.00, 1.00, 2.00, 3.00, 3.00, 0.00, 1.00, 4.00]
mse = np.mean(np.power(np.array(arduino_feature_set) - np.array(arduino_predictions), 2))
print('MSE:', mse)

plt.plot(arduino_feature_set)
plt.title('Input feature set')
plt.figure()
plt.plot(arduino_predictions)
plt.title('Predictions')

In [None]:
# Arduino NN output for anomaly sample
arduino_feature_set = [54.00, 2.00, 1.00, 1.00, 0.00, 1.00, 0.00, 0.00, 1.00, 0.00, 1.00, 3.00, 3.00, 0.00, 0.00, 0.00, 1.00, 0.00, 0.00, 1.00, 1.00, 2.00, 1.00, 0.00, 0.00, 1.00, 2.00, 2.00, 1.00, 0.00, 4.00, 3.00, 325.00, 1.00, 4.00, 1.00, 1.00, 3.00, 1.00, 1.00, 1.00, 1.00, 0.00, 7.00, 10.00, 4.00, 0.00, 0.00, 1.00, 0.00, 0.00, 0.00, 0.00, 4.00, 2.00, 0.00, 0.00, 0.00, 1.00, 1.00, 0.00, 1.00, 0.00, 0.00, 2490.00, 22.00, 15.00, 3.00, 10.00, 3.00, 0.00, 3.00, 3.00, 4.00, 1.00, 17.00, 22.00, 6.00, 1.00, 0.00, 2.00, 2.00, 1.00, 1.00, 4.00, 5.00, 2.00, 1.00, 1.00, 3.00, 1.00, 1.00, 1.00, 0.00, 2.00, 1.00] 
arduino_predictions = [0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 1.00, 2.00, 2.00, 1.00, 0.00, 4.00, 3.00, 325.00, 1.00, 4.00, 1.00, 1.00, 3.00, 1.00, 1.00, 1.00, 1.00, 0.00, 7.00, 10.00, 4.00, 0.00, 0.00, 1.00, 0.00, 0.00, 0.00, 0.00, 4.00, 2.00, 0.00, 0.00, 0.00, 1.00, 1.00, 0.00, 1.00, 0.00, 0.00, 2490.00, 22.00, 15.00, 3.00, 10.00, 3.00, 0.00, 3.00, 3.00, 4.00, 1.00, 17.00, 22.00, 6.00, 1.00, 0.00, 2.00, 2.00, 1.00, 1.00, 4.00, 5.00, 2.00, 1.00, 1.00, 3.00, 1.00, 1.00, 1.00, 0.00, 2.00, 1.00] 
mse = np.mean(np.power(np.array(arduino_feature_set) - np.array(arduino_predictions), 2))
print('MSE:', mse)

plt.plot(arduino_feature_set)
plt.title('Input feature set')
plt.figure()
plt.plot(arduino_predictions)
plt.title('Predictions')