# Residual model generation pipeline

In [None]:
import torch
import shutil
import numpy as np
import matplotlib.pyplot as plt

from LMCE.models import MLP
from LMCE.utils import min_max
from LMCE.trainers import train
import LMCE.cfusdlog as cfusdlog
import LMCE.uav_trajectory as uav_trajectory
from LMCE.residual_calculation import indi_residuals
from LMCE.data_prep import prepare_data, create_dataloader
from LMCE.model_to_c_conversion import exportNet, nn_c_model_test
from LMCE.error_estimation import error_calculator, find_best_cutoff

plt.rcParams['text.usetex'] = True
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times']

## Preparing the data
First define where the data is stored, then it can be loaded, the `prepare_data` function calculates the labels (residual forces).

In [None]:
# Get train data paths
indices = [i for i in range(0, 100)]
# test_indices = [22]
# for i in test_indices:
#     del indices[i]
training_data_paths = [f"./final_pay_data/lpin{i:02}" for i in indices]
# testing_data_paths = [f"./final_pay_data/lpin{i:02}" for i in test_indices]
testing_data_paths = ["./lpin02"]

cutoffs = {}

residual_func = lambda data: indi_residuals(data, payload=True, make_smooth=True, show_fit=True)

X_train, y_train_ = prepare_data(training_data_paths,
                                 save_as="train_data_payload_splines",
                                 residual_func=residual_func,
                                 overwrite=False,
                                 verbose=1,
                                 cutoffs=cutoffs,
                                 payload=True)

X_test, y_test_ = prepare_data(testing_data_paths,
                               save_as="test_data_payload_splines",
                               shuffle_data=False,
                               residual_func=residual_func,
                               overwrite=False,
                               verbose=1,
                               cutoffs=cutoffs,
                               payload=True)

y_train_ = y_train_[:, 3:]
y_test_ = y_test_[:, 3:]

In [None]:
_, X_mins, X_maxs = min_max(X_train.copy())
X_mins[:6] = -1.
X_maxs[:6] = 1.
X_train, _, _ = min_max(X_train, X_mins, X_maxs)
X_test, _, _ = min_max(X_test, X_mins, X_maxs)

y_train, y_mins, y_maxs = min_max(y_train_)
y_test, _, _ = min_max(y_test_, y_mins, y_maxs)

train_loader = create_dataloader(X_train, y_train)
val_loader = create_dataloader(X_test, y_test)

print("Train size: ", X_train.shape[0])
print("Test size: ", X_test.shape[0])
print("X dim: ", X_train.shape[1], "y dim: ", y_train.shape[1])

We can plot the flight path and residual forces like this

## Training the model

In [None]:
# Create and train neural network
model = MLP(input_size=X_train.shape[1], output_size=y_train.shape[1], hidden_layers=[24, 24, 24])
# model = train(model, train_loader, val_loader, lr=3e-4, scheduler_gamma=.92, scheduler_step_size=10, device="cpu", epochs=128)
# model.save("mlp_payload")

## Testing the model

In [None]:
residual_func = lambda data: indi_residuals(data, payload=True, make_smooth=False)

X_test, y_test_ = prepare_data(["./payload_final/cric_new/indi/lpin01"],
                               save_as="test_data_pay",
                               shuffle_data=False,
                               residual_func=residual_func,
                               overwrite=True,
                               verbose=1,
                               payload=True)
y_test_ = y_test_[:, 3:]
X_test, _, _ = min_max(X_test, X_mins, X_maxs)
y_test, _, _ = min_max(y_test_, y_mins, y_maxs)

model.float()
val_loader = create_dataloader(X_test, y_test)

weights = torch.load('./models/mlp_payload.pth')
model.load_state_dict(weights)

tensor_input = torch.from_numpy(X_test).type(torch.float32)
pred = model.forward(tensor_input).detach().numpy()
error = np.abs(y_test-pred)
# print(f"Average force error: (x, y, z) N -> ({np.mean(error[:, 0]):.4f}, {np.mean(error[:, 1]):.4f}, {np.mean(error[:, 2]):.4f}) N")
# print(f"Average torque error: (x, y, z) N -> ({np.mean(error[:, 3]):.4f}, {np.mean(error[:, 4]):.4f}, {np.mean(error[:, 5]):.4f}) N")

y_test_scaled = (y_test + 1) * .5 * (y_maxs - y_mins) + y_mins
pred_scaled = (pred + 1) * .5 * (y_maxs - y_mins) + y_mins

fig, ax = plt.subplots(3, figsize=(10, 15))
for i, f in enumerate(["Torque"]):
    for j, v in enumerate(["X", "Y", "Z"]):
        idx = j+i*3
        ax[idx].plot(y_test_scaled[:, idx], label="Real")
        ax[idx].plot(pred_scaled[:, idx], label="Predicted")
        # ax[idx].plot(y_test[:, idx], label="Real")
        # ax[idx].plot(pred[:, idx], label="Predicted")
        ax[idx].set_title(f'{f} {v}')
        ax[idx].set_xlabel('Timestamp')
        ax[idx].set_ylabel(f'{f} prediction')
        ax[idx].legend()

plt.show()

## Translating the model to c code

In [None]:
# Generate the model c code
model_path = "./models/mlp_payload.pth"
exportNet(model_path, X_mins=X_mins, X_maxs=X_maxs, y_mins=y_mins, y_maxs=y_maxs, info="Neural network for the 2.1 Crazyflie carrying a payload")

# Test if the generated model gives the same outputs as the original
model.double()
nn_c_model_test(model, X_mins=X_mins, X_maxs=X_maxs, y_mins=y_mins, y_maxs=y_maxs, in_dim=X_train.shape[1])

## Adding the model to the cf-firmware

In [None]:
try:
    shutil.copyfile("./LMCE/c_utils/nn.c", "../crazyflie-firmware/src/modules/src/controller/nn_payload.c")
    shutil.copyfile("./LMCE/c_utils/nn.h", "../crazyflie-firmware/src/modules/interface/controller/nn_payload.h")
    print("Successfully copied nn to firmware.")
except:
    print("Crazyflie firmware not found.")

## Error calculation
For calculating the error with the addition of the learning models the flights need to be re-recorded with the updated firmware.

In [None]:
# Get desired path
traj = uav_trajectory.Trajectory()
traj.loadcsv("./LMCE/flight_paths/figure8.csv")
traj.stretchtime(2)

ts = np.arange(0, traj.duration, 0.01)
evals = np.empty((len(ts), 15))
for t, i in zip(ts, range(0, len(ts))):
    e = traj.eval(t)
    e.pos += np.array([0, 0, 1])
    evals[i, 0:3] = e.pos

target_pos = evals.transpose()

# Get real path
data_path = "inditest/tle94"
try:
    data = cfusdlog.decode(data_path)['fixedFrequency']
    start = 190
    end = 500
    real_pos = [data["stateEstimate.x"][start:end], data["stateEstimate.y"][start:end]-1.]

    # Calculate error
    cutoff = find_best_cutoff(real_pos, target_pos)
    avg_error = error_calculator(cutoff, real_pos, target_pos, vis=True)
    print(f"Avg. error: {avg_error:.4f} (m)")
except:
    print("Test file not found.")