In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from sklearn.model_selection import train_test_split

from consts import Columns
from data_processing import plot_curves, process_dataset
from machine_learning import NeuralNetwork

In [None]:
plt.style.use('ggplot')
# plt.style.available

In [None]:
processed_dataset: pd.DataFrame = process_dataset()
processed_dataset.reset_index(drop=True, inplace=True)
processed_dataset

In [None]:
plot_curves(processed_dataset, 0)
plot_curves(processed_dataset, 1)

In [None]:
processed_dataset["_outlier"] = processed_dataset.apply(
    lambda entry: entry[Columns.OUTPUT_VECTOR][0] / 800 + entry[Columns.OUTPUT_VECTOR][1] / 200 <= 1,
    axis=1,
)
outliers = processed_dataset[processed_dataset["_outlier"] == True]
outliers

In [None]:
plot_curves(outliers, 0)
plot_curves(outliers, 1)

In [None]:
final_dataset = processed_dataset[processed_dataset["_outlier"] == False]
final_dataset.reset_index(drop=True, inplace=True)
final_dataset

In [None]:
plot_curves(final_dataset, 0)
plot_curves(final_dataset, 1)

In [None]:
x_datapoints = torch.tensor(final_dataset.apply(lambda entry: [
    # feature scaling to increase input features size/count
    # (entry[Columns.OUTPUT_VECTOR][0] ** pow_x) * (entry[Columns.OUTPUT_VECTOR][1] ** pow_y)
    # for pow_x in range(0, 5) for pow_y in range(0, 5)
    entry[Columns.INPUT_VECTOR][0],
    entry[Columns.INPUT_VECTOR][1],
], axis=1).tolist(), dtype=torch.float)
x_datapoints

In [None]:
y_datapoints = torch.tensor(final_dataset.apply(lambda entry: [
    entry[Columns.OUTPUT_VECTOR][0],
    entry[Columns.OUTPUT_VECTOR][1],
], axis=1).tolist(), dtype=torch.float)
y_datapoints

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x_datapoints, y_datapoints, test_size=0.05, random_state=0)

In [None]:
# remove the following comment to train the deep model
# train_model(x_train, y_train, save_as="model.pth")

In [None]:
model = NeuralNetwork()
model.load_state_dict(torch.load("model.pth"))

# accuracy values for different values of max distance allowed, that is, based to closeness
# for max_distance in np.linspace(0.01 * LinearScales.ScaleLength, 0.10 * LinearScales.ScaleLength, 10):
#     accuracy = calculate_accuracy(model, x_train, y_train, max_distance)
#     print(f"Max. Distance = {max_distance:.2f}, Accuracy = {accuracy:.2f}%")

In [None]:
from torchmetrics.functional import mean_absolute_percentage_error

# calculating train accuracy
scores = model(x_train)
error_train = np.mean(np.clip(np.array([
    float(mean_absolute_percentage_error(scores[index], y_train[index])) * 100
    for index in range(len(scores))
]), 0, 100))
print("Train error:", error_train, "%")

# calculating test accuracy
scores = model(x_test)
error_test = np.mean(np.clip(np.array([
    float(mean_absolute_percentage_error(scores[index], y_test[index])) * 100
    for index in range(len(scores))
]), 0, 100))
print("Test error:", error_test, "%")

In [None]:
predicted = np.array(model(x_datapoints).tolist())
predicted_dataset = final_dataset.copy()
predicted_dataset[Columns.PREDICTED_X] = pd.Series(np.round(predicted[:, 0]))
predicted_dataset[Columns.PREDICTED_Y] = pd.Series(np.round(predicted[:, 1]))
# columns required to make the scatter plot of the points
# predicted_dataset = predicted_dataset[[
#     Columns.INPUT_X,
#     Columns.INPUT_Y,
#     Columns.OUTPUT_X,
#     Columns.OUTPUT_Y,
#     Columns.PREDICTED_X,
#     Columns.PREDICTED_Y,
# ]]
predicted_dataset.to_csv("predicted_dataset.csv", index=False)
predicted_dataset

In [None]:
_predicted_dataset = predicted_dataset.copy()
_predicted_dataset[Columns.OUTPUT_VECTOR] = _predicted_dataset.apply(lambda entry: np.array([
    entry[Columns.PREDICTED_X],
    entry[Columns.PREDICTED_Y],
]), axis=1)

plot_curves(_predicted_dataset, 0)
plot_curves(_predicted_dataset, 1)

In [None]:
import matplotlib.pyplot as plt

real_scores = np.array(y_test.tolist())
predicted_scores = np.round(np.array(model(x_test).tolist()))

plt.scatter(real_scores[:, 0], real_scores[:, 1], marker="o", label="Real")
plt.scatter(predicted_scores[:, 0], predicted_scores[:, 1], marker="x", label="Predicted")
plt.legend()
plt.show()

In [None]:
test_dataset = pd.DataFrame()
test_dataset["X_real"] = real_scores[:, 0]
test_dataset["Y_real"] = real_scores[:, 1]
test_dataset["X_predicted"] = predicted_scores[:, 0]
test_dataset["Y_predicted"] = predicted_scores[:, 1]
test_dataset

In [None]:
x_lookup = torch.tensor([
    [l_pos, r_pos]
    for l_pos in np.linspace(0, 2000, 201, dtype=int)
    for r_pos in np.linspace(0, 500, 51, dtype=int)
], dtype=torch.float)
x_lookup

In [None]:
lookup_table = pd.DataFrame()
lookup_table[Columns.INPUT_VECTOR] = pd.Series(x_lookup.tolist())
# adding columns for plotting purpose
lookup_table[Columns.INPUT_X] = lookup_table.apply(lambda entry: entry[Columns.INPUT_VECTOR][0], axis=1)
lookup_table[Columns.INPUT_Y] = lookup_table.apply(lambda entry: entry[Columns.INPUT_VECTOR][1], axis=1)

predicted = np.array(model(x_lookup).tolist())
lookup_table[Columns.PREDICTED_X] = pd.Series(np.round(predicted[:, 0]))
lookup_table[Columns.PREDICTED_Y] = pd.Series(np.round(predicted[:, 1]))
# adding columns for plotting purpose
lookup_table[Columns.OUTPUT_VECTOR] = lookup_table.apply(lambda entry: np.array([
    entry[Columns.PREDICTED_X],
    entry[Columns.PREDICTED_Y],
]), axis=1)
lookup_table.to_csv("lookup_table.csv", index=False)
lookup_table

In [None]:
plot_curves(lookup_table, key=0)
plot_curves(lookup_table, key=1)

In [None]:
def search_position_vector(search_table: pd.DataFrame, position_vector: np.ndarray):
    search_table["_dist"] = search_table.apply(
        lambda entry: np.linalg.norm(position_vector - entry[Columns.OUTPUT_VECTOR]),
        axis=1,
    )
    index = search_table["_dist"].idxmin()
    return search_table[Columns.INPUT_VECTOR][index]

In [None]:
search_position_vector(lookup_table.copy(), np.array([600, 600]))

In [None]:
from scipy.spatial import KDTree

lookup_tree = KDTree(np.array(model(x_lookup).tolist()))
distance, index = lookup_tree.query([600, 600])
x_lookup[index]

In [None]:
# _search_table = lookup_table.copy()
motor_steps = [
    # search_position_vector(_search_table, np.array([x_position, 800]))
    x_lookup[lookup_tree.query([x_position, 800])[1]].tolist()
    for x_position in np.linspace(100, 600, 101)
]
motor_steps

In [None]:
from motor_controller import connect_arduino
import matplotlib.pyplot as plt
from scipy.interpolate import UnivariateSpline

l_controller, r_controller = connect_arduino()

In [None]:
motor_steps = np.array(motor_steps)
indices = range(len(motor_steps))
l_spline = UnivariateSpline(indices, motor_steps[:, 0], k=5, s=0.50)
l_steps = l_spline(indices)
r_spline = UnivariateSpline(indices, motor_steps[:, 1], k=5, s=0.50)
r_steps = r_spline(indices)

In [None]:
plt.plot(indices, motor_steps[:, 0], label="L_actual")
# plt.plot(indices, l_steps, label="L_smoothed")
plt.plot(indices, motor_steps[:, 1], label="R_actual")
# plt.plot(indices, r_steps, label="R_smoothed")
plt.legend()
plt.show()

In [None]:
plt.plot(motor_steps[:, 0], motor_steps[:, 1])
plt.show()

In [None]:
# from time import sleep

for index, (l_step, r_step) in enumerate(motor_steps):
    # l_step, r_step = l_steps[index], r_steps[index]

    l_controller.set_position(l_step)
    assert l_controller.get_position() == l_step

    r_controller.set_position(-r_step)
    assert r_controller.get_position() == -r_step

    # sleep(0.1)

r_controller.set_position(0), l_controller.set_position(0)

In [None]:
from pygad import pygad


def get_input_next(target: torch.Tensor, previous_solution: np.ndarray = np.array([0, 0])):
    # define the fitness function according to the euclidean distance
    def calculate_fitness(_ga_instance, current_solution, _solution_index):
        motor_input = np.array(current_solution, dtype=int)
        score = model(torch.tensor(motor_input, dtype=torch.float))
        error = mean_absolute_percentage_error(score, target)
        fitness = 1 / error
        return float(fitness)

    # create a genetic algorithm instance to search for the solution
    ga_instance = pygad.GA(
        num_generations=75,
        num_parents_mating=8,
        fitness_func=calculate_fitness,
        sol_per_pop=25,
        num_genes=2,
        keep_parents=1,
        mutation_num_genes=1,
        parallel_processing=4,
        gene_space=[list(range(2001)), list(range(501))],
        gene_type=[int, int],
        initial_population=np.array([previous_solution for _ in range(8)], dtype=int),
    )

    # run the genetic algorithm creating the genetic pool from parents
    ga_instance.run()

    # return the best solution as the motor input
    solution, solution_fitness, solution_index = ga_instance.best_solution()
    return np.array(solution, dtype=int), f"{round(100 / solution_fitness, 2)}%"


In [None]:
start_from = np.array([0, 0])
for x_position in np.linspace(100, 900, 161):
    s, e = get_input_next(torch.tensor([x_position, 800], dtype=torch.float), previous_solution=start_from)
    print(s, e)
    r_controller.set_position(-s[1])
    l_controller.set_position(s[0])
    start_from = s  # next time start searching from the current position


In [None]:
l_controller.set_position(0), r_controller.set_position(0)

In [None]:
import math

motor_steps = [
    x_lookup[lookup_tree.query(np.array([350, 700]) + 100 * np.array([math.cos(theta), math.sin(theta)]))[1]].tolist()
    for theta in np.linspace(-math.pi, math.pi, 361)
]
motor_steps

In [None]:
from scipy.interpolate import interp1d

circle = np.array([
    # np.array([350, 550]) + 100 * np.array([math.cos(theta), math.sin(theta)])
    x_lookup[lookup_tree.query(np.array([350, 700]) + 100 * np.array([math.cos(theta), math.sin(theta)]))[1]].tolist()
    for theta in np.linspace(-math.pi, math.pi, 361)
])
spline = interp1d(circle[:, 0], circle[:, 1])

plt.plot(circle[:, 0], circle[:, 1], marker=".", color="blue", label="Actual")
plt.plot(circle[:, 0], [spline(x) for x, y in circle], marker=".", color="green", label="Smoothed")
plt.axis("square")
plt.legend()
plt.show()

In [None]:
# from time import sleep

for x, y in circle:
    l_controller.set_position(x)
    assert l_controller.get_position() == x

    r_controller.set_position(-spline(x))
    assert r_controller.get_position() == -spline(x)

    # sleep(0.1)

r_controller.set_position(0), l_controller.set_position(0)