# Bow tracking model training w/ TensorFlow

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import sklearn
import random
import math
import json
import time
import os

import tensorflow as tf
from tensorflow import keras
from keras.models import Model
from keras.layers import Layer, Dense, Dropout, BatchNormalization, Activation

from sklearn.model_selection import train_test_split

print(os.getcwd())

In [None]:
def now():
    # Get current date and time and generate string
    now = datetime.datetime.now()
    now_string = now.strftime("%Y-%m-%d_%H-%M-%S")

    return now_string

def model_save_paths(batch_size, learning_rate, epochs):
    model_save_location = f"./trained_models/{now()}_BowTrackingModel_btch-{batch_size}_lr-{learning_rate}_epoch-{epochs}"
    history_save_location = f"./trained_models/history/{now()}_BowTrackingModel_btch-{batch_size}_lr-{learning_rate}_epoch-{epochs}.json"
    mae_results_save_location = f"./trained_models/mae_section_results/{now()}_BowTrackingModel_btch-{batch_size}_lr-{learning_rate}_epoch-{epochs}.csv"

    return (model_save_location, history_save_location, mae_results_save_location)

### 1. Define model architecture

In [None]:
class BowTrackingModel(Model):
    def __init__(self):

        dropout = 0.35
        
        super(BowTrackingModel, self).__init__()
        self.dense1 = Dense(units=256, activation="tanh", name="Dense1")
        self.batch_norm_1 = BatchNormalization(name="BatchNormalization1")
        self.dropout1 = Dropout(dropout, name="Dropout1")
        self.dense2 = Dense(units=256, activation="tanh", name="Dense2")
        self.tanh1 = Activation("tanh", name="Activation")
        self.batch_norm_2 = BatchNormalization(name="BatchNormalization2")
        self.dropout2 = Dropout(dropout, name="Dropout2")
        self.dense3 = Dense(units=256, activation="tanh", name="Dense3")
        self.dense4 = Dense(units=2, activation="relu", name="Dense4")

    def call(self, x):
        x = self.dense1(x)
        x = self.dropout1(x)
        x = self.batch_norm_1(x)
        x = self.dense2(x)
        x = self.tanh1(x)
        x = self.batch_norm_2(x)
        x = self.dropout2(x)
        x = self.dense3(x)
        x = self.dense4(x)
        return x

### 2. Create and compile model

In [None]:
learning_rate = 0.0001
batch_size = 64

n_inputs = 4
n_outputs = 2

model = BowTrackingModel()

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
loss = tf.keras.losses.MeanAbsoluteError()

# Define evaluation metrics
metric_accuracy = tf.keras.metrics.Accuracy()
metric_mse = tf.keras.metrics.MeanAbsoluteError()
metric_mspe = tf.keras.metrics.MeanAbsolutePercentageError()
metric_r2 = tf.keras.metrics.R2Score()
metric_rmse = tf.keras.metrics.RootMeanSquaredError()
metrics = [metric_accuracy, metric_mse, metric_mspe, metric_r2, metric_rmse]

model.compile(
    optimizer=optimizer, 
    loss=loss, 
    metrics=metrics
)
model.build(input_shape=(1, n_inputs))
model.summary()

tf.keras.utils.plot_model(model, "./bt_model_diagram_vertical.png", show_shapes=True, rankdir="TB", show_dtype=False, dpi=300)

### 3. Load and format dataset

In [None]:
cols_to_keep = ["position", "hair_stick_distance", "force", "norm1", "norm2", "norm3", "norm4"]

dataset_folder = "./dataset/"
dataset_filename = ""
dataset_filepath = os.path.join(dataset_folder, dataset_filename)

dataset = pd.read_csv(dataset_filepath)

dataset

### 4. Get testing and training sets

In [None]:
input_cols = ["norm1", "norm2", "norm3", "norm4"]
output_cols = ["position", "force"]

input_data = dataset[input_cols].to_numpy()
output_data = dataset[output_cols].to_numpy()

input_train, input_test, output_train, output_test = train_test_split(input_data, output_data, test_size=0.3, shuffle=True)

print(input_train.shape)
print(input_test.shape)
print(output_train.shape)
print(output_test.shape)

### 5. Train model

In [None]:
epochs = 25

# Define callbacks
callback_stop = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=10)
callback_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=f"./trained_models/model_checkpoints/{now()}_BowTrackingModel_local_btch-{batch_size}_lr-{learning_rate}",
    verbose=1
)

# Train model
history = model.fit(
    x=input_train,
    y=output_train,
    epochs=epochs,
    batch_size=batch_size,
    callbacks=[callback_stop, callback_checkpoint]
)

### 6. Evaluate R2 score and MAE of model

In [None]:
output_test_predicted = model.predict(input_test)

print(f"Coefficient of determination (r2 score): {sklearn.metrics.r2_score(output_test, output_test_predicted)}")
print(f"Overall MAE: {sklearn.metrics.mean_absolute_error(output_test, output_test_predicted)}")    

### 7. Evaluate MAE in different sectors

In [None]:
pos_boundaries = [0, 230, 440, 660]
hsd_boundaries = [0, 1, 3, 5, 12]

h_sections = len(pos_boundaries)-1
v_sections = len(hsd_boundaries)-1

mae_results = pd.DataFrame()

for h_sect in range(h_sections):
    pos_lower = pos_boundaries[h_sect]
    pos_upper = pos_boundaries[h_sect+1]

    for v_sect in range(v_sections):
        hsd_lower = hsd_boundaries[v_sect]
        hsd_upper = hsd_boundaries[v_sect+1]

        true_idxs = []
        for i, output in enumerate(output_test):
            if pos_lower <= output[0] <= pos_upper and hsd_lower <= output[1] <= hsd_upper:
                true_idxs.append(i)
        error_total_2d = 0
        error_total_x  = 0
        error_total_y  = 0
        for i in true_idxs:
            meas_output = output_test[i]
            pred_output = output_test_predicted[i]
            
            abs_error = math.sqrt(pow(meas_output[0]-pred_output[0], 2) + pow(meas_output[1]-pred_output[1], 2))
            error_total_2d += abs_error

            x_error = math.sqrt(pow(meas_output[0]-pred_output[0], 2))
            error_total_x += x_error
            y_error = math.sqrt(pow(meas_output[1]-pred_output[1], 2))
            error_total_y += y_error
            
        mae = error_total_2d / len(true_idxs)
        x_mae = error_total_x / len(true_idxs)
        y_mae = error_total_y / len(true_idxs)

        mae_results_i = len(mae_results.index)
        mae_results.loc[mae_results_i, "pos_lower"] = pos_lower
        mae_results.loc[mae_results_i, "pos_upper"] = pos_upper
        mae_results.loc[mae_results_i, "hsd_lower"] = hsd_lower
        mae_results.loc[mae_results_i, "hsd_upper"] = hsd_upper
        mae_results.loc[mae_results_i, "combined_mae"] = mae
        mae_results.loc[mae_results_i, "x_mae"] = x_mae
        mae_results.loc[mae_results_i, "y_mae"] = y_mae

mae_results

### 8. Plot model accuracy for random samples in test set

In [None]:
n_individual_tests = 250 # Number of random points to check
red_threshold = 50       # Distance in mm between measured and predicted in order for line to be fully red

plt.figure(figsize=(8,3))
plt.ylim(0,13)
plt.xlim(0,660)

sensor_positions = [562, 465, 189, 69]

for i in range(n_individual_tests):
    idx = random.randint(0, input_test.shape[0])
    
    input_data = input_test[idx].reshape((1,4))
    
    call_start = time.time()
    prediction = model(input_data).numpy()[0]
    call_time = time.time() - call_start
    
    actual = output_test[idx]

    distance_between_points = math.sqrt((prediction[0]-actual[0])**2 + (prediction[1]-actual[1])**2)
    scaled_distance = distance_between_points / red_threshold if distance_between_points <= red_threshold else 1
    line_colour = (scaled_distance, 1-scaled_distance, 0)

    x_vals = [prediction[0], actual[0]]
    y_vals = [prediction[1], actual[1]]
    
    plt.plot(x_vals, y_vals, linewidth=0.5, color=line_colour)
    plt.scatter(x_vals[0], y_vals[0], marker="o", c="orange", edgecolors="black", linewidths=0.5, s=25, label="Predicted" if i == 0 else "", zorder=2)
    plt.scatter(x_vals[1], y_vals[1], marker="s", c="purple", edgecolors="black", linewidths=0.5, s=25, label="Measured" if i == 0 else "", zorder=2)

for k, sensor_pos in enumerate(sensor_positions):
    plt.axvline(
        sensor_pos, 
        ls=":", 
        c="black", 
        lw=1, 
        label="Sensor positions" if k == 0 else ""
    )

plt.xlabel("Position [mm]")
plt.ylabel("HSD [mm]")
plt.legend()
plt.grid(alpha=0.25, color="lightgrey")
plt.show()

### 9. Save model, model history, and MAE results to file

In [None]:
model_save_path, history_save_path, mae_results_save_path = model_save_paths(batch_size, learning_rate, epochs)

model.save(model_save_path)
print(f"Saved model to {model_save_path}")

# Save model history dict to json
model_history_data = history.history
json.dump(model_history_data, open(history_save_path, "w+"))
print(f"Saved model history to {history_save_path}")

mae_results.to_csv(mae_results_save_path, index=False)
print(f"Saved MAE results to {mae_results_save_path}")