# Predictive Maintenance

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import scipy
import random

import tensorflow as tf
import sklearn
import itertools

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

In [None]:
!python --version
print("NumPy: " + np.__version__)
print("Keras: " + tf.keras.__version__)
print("TensorFlow: " + tf.__version__)
print("sklearn: " + sklearn.__version__)

# Hardwareunterstützung?
tf.config.list_physical_devices()

## Datensätze importieren

In [None]:
# Version 1: Manuelle Auswahl der Daten

#df = pd.read_csv('../data/fan_idle.csv')

df = pd.read_csv('../data/fan_normal_speed1.csv')
#df = pd.read_csv('../data/fan_normal_speed2.csv')
#df = pd.read_csv('../data/fan_normal_speed3.csv')

#df = pd.read_csv('../data/fan_unbalanced_speed1.csv')
#df = pd.read_csv('../data/fan_unbalanced_speed2.csv')
#df = pd.read_csv('../data/fan_unbalanced_speed3.csv')

#df = pd.read_csv('../data/fan_clogged_speed1.csv'
#df = pd.read_csv('../data/fan_clogged_speed2.csv')
#df = pd.read_csv('../data/fan_clogged_speed3.csv')

In [None]:
# Version 2: Verarbeitung aller Dateien im Verzeichnis 'data_selected'

samplingFreq = 1620    # Abtastfrequenz in Hertz
bufferSize   = 256     # Blockgröße eines Buffers
dataSets     = 40      # Anzahl an Datensätzen pro Klasse

fan_data_tmp  = np.empty([0, bufferSize])       # leere Matrix für Daten
fan_labels_tmp = np.array([])                    # leerer Vektor für Klassen

path  = '../data_selected/'
files = [f for f in os.listdir(path) if f.endswith('.csv')]
files.sort()

# Dateien einlesen und Daten/Klassen ablegen
for labelID, file in enumerate(files):

  print("Class %d (%s): " % (labelID, file), end="")
  df = pd.read_csv(path + file)
  print("found %d samples" % len(df))

  data = np.array(df['aX'])                    # nur Daten der X-Achse extrahieren
  data = data-data.mean()                      # Gleichanteile entfernen
  data = data.reshape(dataSets, bufferSize)    # Vektor in Matrix transformieren

  # Daten und Klasse hinzufügen
  fan_data_tmp  = np.concatenate([fan_data_tmp, data], axis=0)
  fan_labels_tmp = np.concatenate([fan_labels_tmp, np.full((dataSets), labelID)])

  numClasses = labelID+1

fan_labels_tmp = tf.keras.utils.to_categorical(fan_labels_tmp, numClasses)

### Daten mischen

In [None]:
new_idx = np.arange(len(fan_data_tmp))
random.shuffle(new_idx)

fan_data   = np.empty([len(fan_data_tmp), bufferSize])
fan_labels = np.empty([len(fan_labels_tmp), labelID+1])

for i in np.arange(len(fan_data)):
  fan_data[new_idx[i]]  = fan_data_tmp[i]
  fan_labels[new_idx[i]] = fan_labels_tmp[i]

In [None]:
def plot_data(id):

  t = np.arange(bufferSize)/samplingFreq
  
  plt.figure(figsize=(12,4))
  plt.plot(t, fan_data[id])

  plt.xlabel('Zeit (Sekunden)')
  plt.ylabel('aX ($m/s^2$)')
  plt.title('Class %d' % fan_labels[id].argmax())
  plt.grid()

In [None]:
plot_data(0)

## FFT

In [None]:
N = bufferSize

fan_data_fft  = np.empty([fan_data.shape[0], fan_data.shape[1]//2])       # leere Matrix für Daten (FFT)

# Berechne FFT und Normalisiere Werte
for i in np.arange(len(fan_data)):

  Y     = scipy.fft.fft(fan_data[i])
  Y_fft = 2.0/N * np.abs(Y[0:N//2])
  peak_ = np.max(Y_fft)
  fan_data_fft[i] = Y_fft/peak_      # normalized

In [None]:
def plotFFT(data, label):

  N = 2*len(data)
  freq_ = scipy.fft.fftfreq(N, 1/samplingFreq)[:N//2]

  peaks, _ = scipy.signal.find_peaks(data, height=0.01, distance=100)

  plt.figure(figsize=(12,4))
  plt.plot(freq_, data)
  plt.plot(freq_[peaks], data[peaks], "x")
  plt.title("Class: %d" % label.argmax())
  plt.xlabel('Frequenz (Hz)')
  plt.ylabel('Betrag (norm)')
  plt.grid()

In [None]:
index = 0
plotFFT(fan_data_fft[index], fan_labels[index])

### Daten splitten

In [None]:
num_inputs = len(fan_data)

TRAIN_SPLIT = int(0.6 * num_inputs)
TEST_SPLIT = int(0.2 * num_inputs + TRAIN_SPLIT)

fan_data_train, fan_data_test, fan_data_val       = np.split(fan_data_fft, [TRAIN_SPLIT, TEST_SPLIT])
fan_labels_train, fan_labels_test, fan_labels_val = np.split(fan_labels, [TRAIN_SPLIT, TEST_SPLIT])

## Entwurf und Training des neuronalen Netzes

In [None]:
model = tf.keras.Sequential()
model.add(tf.keras.layers.Input(shape=[bufferSize//2,]))
model.add(tf.keras.layers.Dense(64, activation='relu'))
model.add(tf.keras.layers.Dense(32, activation='relu'))
model.add(tf.keras.layers.Dense(numClasses, activation='softmax'))

# Zusammenfassung
#model.summary()

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
# Training
history = model.fit(fan_data_train, fan_labels_train, batch_size=10, epochs=50, verbose=1, validation_data=(fan_data_val, fan_labels_val))

In [None]:
# Trainingsverlauf grafisch darstellen
epochs = range(1, len(history.history['val_loss']) + 1)

plt.figure(figsize=(8,7))
plt.plot(epochs, history.history['val_loss'])
plt.xlabel('Epoche')
plt.ylabel('Fehler')
plt.grid(True)

### Vorhersage

In [None]:
predict = model.predict(fan_data_test)

### Performance

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Konfusionsmatrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.figure(figsize=(6,6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    #plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('Wahre Klasse')
    plt.xlabel('Vorhergesagte Klasse')

In [None]:
confusion_mtx = sklearn.metrics.confusion_matrix(fan_labels_test.argmax(axis=1), predict.round().argmax(axis=1))
plot_confusion_matrix(confusion_mtx, classes = range(numClasses))

### Konvertierung in TFLite-Modell

In [None]:
# Quantisierung und Optimierung für Microcontroller

converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimazations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

TFLITE_MODEL_FILENAME = "../model/predMaintenance.tflite"
open(TFLITE_MODEL_FILENAME, "wb").write(tflite_model);