In [None]:
import utility as ut
import numpy as np
import matplotlib.pyplot as plt
import random
import copy


In [None]:
# File paths
DIRECTORY = "data/"
GANTRY_CRANE_PARAMETERS_JSON_PATH = "gantry_crane_parameters.json"
RESULTS_PATH = "SGD_results/"

In [None]:
# Load data
datasets = ut.load_excel_data(DIRECTORY)

In [None]:
DURATION = 15  # duration in seconds
DT = 0.0001  # time increment in seconds
# Create a time array
time_array = np.arange(0, DURATION + DT, DT)
NUM_STEPS = len(time_array)

In [None]:
interpolated_datasets = []
for i in range(len(datasets)):
    new_trolley_position = np.interp(
        time_array, datasets[i]["timestamp"], datasets[i]["trolley_position"]
    )
    new_cable_length = np.interp(
        time_array, datasets[i]["timestamp"], datasets[i]["cable_length"]
    )
    new_sway_angle = np.interp(
        time_array, datasets[i]["timestamp"], datasets[i]["sway_angle"]
    )
    new_trolley_motor_pwm = np.interp(
        time_array, datasets[i]["timestamp"], datasets[i]["pwm_trolley_motor"]
    )
    new_hoist_motor_pwm = np.interp(
        time_array, datasets[i]["timestamp"], datasets[i]["pwm_hoist_motor"]
    )
    new_trolley_motor_voltage = np.interp(
        time_array, datasets[i]["timestamp"], datasets[i]["trolley_motor_voltage"]
    )
    new_hoist_motor_voltage = np.interp(
        time_array, datasets[i]["timestamp"], datasets[i]["hoist_motor_voltage"]
    )

    interpolated_df = {
        "time": time_array,
        "trolley_position": new_trolley_position,
        "cable_length": new_cable_length,
        "sway_angle": new_sway_angle,
        "trolley_motor_pwm": new_trolley_motor_pwm,
        "hoist_motor_pwm": new_hoist_motor_pwm,
        "trolley_motor_voltage": new_trolley_motor_voltage,
        "hoist_motor_voltage": new_hoist_motor_voltage,
    }

    interpolated_datasets.append(interpolated_df)

In [None]:
print(len(interpolated_datasets))
max_sway_angle = 0
for i in range(len(interpolated_datasets)):
    max_sway_angle = max(
        max_sway_angle, max(abs(interpolated_datasets[i]["sway_angle"]))
    )

# Sort the interpolated_datasets by average trolley motor PWM from lowest to highest and then by average hoist motor PWM from lowest to highest
interpolated_datasets.sort(key=lambda x: np.mean(x["trolley_motor_pwm"]))
interpolated_datasets.sort(key=lambda x: np.mean(x["hoist_motor_pwm"]))

In [None]:
for key in interpolated_datasets[0]:
    print(key)

In [None]:
show_interpolated_datasets = False
if show_interpolated_datasets:
    j = 0
    for i in range(len(interpolated_datasets)):
        if i % 3 != 0:
            continue
        j += 1
        # Create a figure and a set of subplots
        figure, ax = plt.subplots(4, 2, figsize=(14, 9), sharex=True)
        figure.suptitle(f"Interpolated Dataset {j+1}", fontsize=16)
        ut.add_to_subplot(ax[0, 0], interpolated_datasets[i]["time"], interpolated_datasets[i]["trolley_motor_pwm"], ylabel="PWM motor troli", color="blue")
        ut.add_to_subplot(ax[0, 1], interpolated_datasets[i]["time"], interpolated_datasets[i]["hoist_motor_pwm"], ylabel="PWM motor angkut", color="red")
        ut.add_to_subplot(ax[1, 0], interpolated_datasets[i]["time"], interpolated_datasets[i]["trolley_motor_voltage"], ylabel="Tegangan motor troli (V)", color="blue")
        ut.add_to_subplot(ax[1, 1], interpolated_datasets[i]["time"], interpolated_datasets[i]["hoist_motor_voltage"], ylabel="Tegangan motor angkut (V)", color="red")
        ut.add_to_subplot(ax[2, 0], interpolated_datasets[i]["time"], interpolated_datasets[i]["trolley_position"], ylabel="Posisi troli (m)", color="green")
        ut.add_to_subplot(ax[2, 1], interpolated_datasets[i]["time"], interpolated_datasets[i]["cable_length"], ylabel="Panjang tali (m)", color="orange")
        ut.add_to_subplot(ax[3, 0], interpolated_datasets[i]["time"], interpolated_datasets[i]["sway_angle"], "Waktu (s)", "Sudut ayun (°)", "purple")
        ut.add_to_subplot(ax[3, 1], interpolated_datasets[i]["time"], interpolated_datasets[i]["sway_angle"], "Waktu (s)", "Sudut ayun (°)", "purple")
        ax[3, 0].set_ylim(-max_sway_angle, max_sway_angle)
        ax[3, 1].set_ylim(-max_sway_angle, max_sway_angle)
        plt.tight_layout()
        plt.show()

In [None]:
# Open gantry crane parameter json file
USE_LAST_BEST_PARAMETERS = True
if USE_LAST_BEST_PARAMETERS:
    all_gantry_crane_parameters = ut.load_json(RESULTS_PATH + "best_parameters.json")
else:
    all_gantry_crane_parameters = ut.load_json(GANTRY_CRANE_PARAMETERS_JSON_PATH)["gantry_crane_system_model"]["parameters"]
print(all_gantry_crane_parameters)

In [None]:
measured_parameters = {}
approximated_parameters = {}
for parameter in all_gantry_crane_parameters:
    if all_gantry_crane_parameters[parameter]["measured"]:
        measured_parameters[parameter] = all_gantry_crane_parameters[parameter]
    else:
        approximated_parameters[parameter] = all_gantry_crane_parameters[parameter]

print(measured_parameters)
print(approximated_parameters)

In [None]:
from model import Simulator
simulator = Simulator(DT, NUM_STEPS)

In [None]:
def cost_function(parameter, dataset):
    trolley_motor_input = dataset["trolley_motor_pwm"]
    hoist_motor_input = dataset["hoist_motor_pwm"]
    initial_conditions = {
        "x" : dataset["trolley_position"][0],
        "l" : dataset["cable_length"][0],
        "theta" : dataset["sway_angle"][0],
    }
    simulator.simulate(parameter, trolley_motor_input, hoist_motor_input, initial_conditions=initial_conditions)
    simulation_result = simulator.get_results()

    simulation_result_array = [simulation_result[key] for key in simulation_result]
    dataset_array = [dataset[key] for key in dataset]

    sum_RMSE = ut.calculate_sum_root_mean_squared_errors(simulation_result_array, dataset_array)
    if np.isnan(sum_RMSE):
        return np.inf
    
    return sum_RMSE

In [None]:
optimize_range = {
    "trolley_mass": (1.0, 5.0),  # Done
    "trolley_damping_coefficient": (0.1, 10.0),  # Done
    "cable_damping_coefficient": (0.1, 10.0),  # Done
    "trolley_motor_rotator_inertia": (0.00001, 0.001),  # Done
    "trolley_motor_damping_coefficient": (0.1, 10.0),
    "trolley_motor_back_emf_constant": (0.001, 0.1),  # Done
    "trolley_motor_torque_constant": (0.01, 1.0),  # Done
    "hoist_motor_rotator_inertia": (0.000001, 0.0001),  # Done
    "hoist_motor_damping_coefficient": (0.1, 10.0),
    "hoist_motor_back_emf_constant": (0.01, 1.0),  # Done
    "hoist_motor_torque_constant": (0.0001, 0.01),  # Done
    "trolley_motor_activation_threshold_voltage": (0.1, 10.0),  # Done
    "hoist_motor_activation_threshold_voltage": (0.1, 10.0),  # Done
}

In [None]:
MAX_ITERATIONS = 1000
MAX_EPISODES = 1
BATCH_SIZE = 3  # Must be less than len(interpolated_datasets)
LEARNING_RATE = 0.001

h = 10**-9

In [None]:
best_parameters = copy.deepcopy(all_gantry_crane_parameters)
best_cost = 0
for dataset in interpolated_datasets:
    best_cost += cost_function(best_parameters, dataset)
    print(best_cost)
    print(" " * 100, end="\r")
best_cost /= len(interpolated_datasets)
print(best_cost)

In [None]:
cost_histories = np.zeros(MAX_EPISODES)
for episode in range(MAX_EPISODES):
    current_parameters = copy.deepcopy(best_parameters)
    for parameter in approximated_parameters:
            current_parameters[parameter]["value"] = np.random.uniform(
                optimize_range[parameter][0], optimize_range[parameter][1]
            )

    diverge = False
    cost_history = np.zeros(MAX_ITERATIONS)
    for iteration in range(MAX_ITERATIONS):
        print(f"\033[92m Episode: {episode} Iteration: {iteration} \033[0m")

        # Choose random datasets for stochastic gradient descent
        random_dataset_indexes = random.sample(range(len(interpolated_datasets)), BATCH_SIZE)
        random_datasets = [interpolated_datasets[i] for i in random_dataset_indexes]

        for parameter in approximated_parameters:
            average_batch_cost = 0
            for dataset in random_datasets:
                average_batch_cost += cost_function(current_parameters, dataset)
            average_batch_cost /= BATCH_SIZE
            if average_batch_cost == np.inf:
                diverge = True
                break
            # print(f"Cost: {average_batch_cost}")

            # Calculate the gradient
            old_parameter_value = current_parameters[parameter]["value"]
            new_parameter_value = current_parameters[parameter]["value"] + h
            current_parameters[parameter]["value"] = new_parameter_value
            new_average_batch_cost = 0
            for dataset in random_datasets:
                new_average_batch_cost += cost_function(current_parameters, dataset)
            new_average_batch_cost /= BATCH_SIZE
            gradient = (new_average_batch_cost - average_batch_cost) / h

            # Update the parameter
            current_parameters[parameter]["value"] = old_parameter_value
            current_parameters[parameter]["value"] -= LEARNING_RATE * gradient

            # Clip the parameter
            current_parameters[parameter]["value"] = np.clip(
                current_parameters[parameter]["value"],
                optimize_range[parameter][0],
                optimize_range[parameter][1],
            )

        # Calculate the cost
        cost = cost_function(current_parameters, dataset)
        print(f"Cost: {cost}")
        cost_history[iteration] = cost

        if cost < best_cost:
            best_cost = cost
            best_parameters = copy.deepcopy(current_parameters)

            # Save the best parameters to a JSON file
            ut.save_json(best_parameters, RESULTS_PATH + "best_parameters.json")

        if diverge:
            break

In [None]:
print(best_parameters)

In [None]:
plt.plot(cost_history)
plt.show()