In [33]:
import torch
import json
import math

from Simulation import Simulation

In [34]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [35]:
class LanderSteeringModel(torch.nn.Module):
    def __init__(self, device):
        super(LanderSteeringModel, self).__init__()
        self.device = device

        self.fc1 = torch.nn.Linear(13, 64)
        self.fc2 = torch.nn.Linear(64, 128)
        self.fc3 = torch.nn.Linear(128, 64)
        self.fc4 = torch.nn.Linear(64, 7)


    def forward(self, telemetry):
        x = [
            telemetry['position'][0],
            telemetry['position'][1],
            telemetry['angle'],
            telemetry['catch_pin_position'][0],
            telemetry['catch_pin_position'][1],
            telemetry['velocity'][0],
            telemetry['velocity'][1],
            telemetry['angular_velocity'],
            telemetry['mass'],
            telemetry['dry_mass'],
            telemetry['propellant_mass'],
            telemetry['propellant_capacity'],
            telemetry['moment']
        ]
        x = torch.tensor(x, dtype=torch.float32).to(self.device)

        x = torch.nn.functional.relu(self.fc1(x))
        x = torch.nn.functional.relu(self.fc2(x))
        x = torch.nn.functional.relu(self.fc3(x))
        x = self.fc4(x)

        y = torch.sigmoid(x)
        y = y.cpu().detach()

        steering_input = {
            "engine1": y[0].item(),
            "engine2": y[1].item(),
            "engine3": y[2].item(),
            "engine4": y[3].item(),
            "engine5": y[4].item(),
            "rcsLeft": y[5].item(),
            "rcsRight": y[6].item()
        }
        return steering_input

In [36]:
def load_settings():
    with open("settings.json") as file:
        settings = json.load(file)
    return settings

In [37]:
settings = load_settings()
tower_arm_settings = settings["terrain"]["tower"]["towerArm"]
target_catch_pin_position_x = tower_arm_settings["xMax"] - tower_arm_settings["xMin"]
target_catch_pin_position_y = tower_arm_settings["yMax"] + settings["lander"]["catchPin"]["radius"]
target_catch_pin_position = (target_catch_pin_position_x, target_catch_pin_position_y)
target_angle = 0
target_velocity = (0, 0)
target_angular_velocity = 0

def loss_function(telemetry):
    catch_pin_position = telemetry['catch_pin_position']
    angle = telemetry['angle']
    velocity = telemetry['velocity']
    angular_velocity = telemetry['angular_velocity']

    position_loss = math.log(1 + math.sqrt((catch_pin_position[0] - target_catch_pin_position[0])**2 + (catch_pin_position[1] - target_catch_pin_position[1])**2))
    angle_loss = abs(angle - target_angle)
    velocity_loss = math.log(1 + math.sqrt((velocity[0] - target_velocity[0])**2 + (velocity[1] - target_velocity[1])**2))
    angular_velocity_loss = abs(angular_velocity - target_angular_velocity)

    # loss = position_loss + angle_loss + velocity_loss + angular_velocity_loss
    loss = position_loss
    return torch.tensor(loss, dtype=torch.float32)

In [38]:
def to_binary_steering(steeering_input):
    binary_steering = {}
    for key, value in steeering_input.items():
        binary_steering[key] = value > 0.5
    return binary_steering

In [52]:
lander_initial_position = settings["landerInitialPosition"]["x"], settings["landerInitialPosition"]["y"]
simulation_iterations_per_step = settings["simulationIterationsPerStep"]
simulation_steps_per_second = settings["simulationStepsPerSecond"]
simulation = Simulation(settings, lander_initial_position, simulation_iterations_per_step)

def evaluate_model(model, simulation, simulation_steps_per_second):
    simulation.reset()
    running = True
    step_count = 0
    max_steps = 10 * simulation_steps_per_second
    while running:
        step_count += 1
        result, telemetry = simulation.step(1 / simulation_steps_per_second)

        steering_input = model(telemetry)
        binary_steering = to_binary_steering(steering_input)
        simulation.set_steering_input(binary_steering)

        if result is not None or step_count > max_steps:
            running = False

    loss = loss_function(telemetry)
    return loss

def select_best_models(population, population_losses):
    # Select the best 50% of the population, based on loss
    # Select using probability based on loss
    population_size = len(population)
    selection_size = population_size // 2
    selected_population = []
    indexes = [i for i in range(population_size)]
    for i in range(selection_size):
        num_indexes = len(indexes)
        index = torch.multinomial(torch.tensor([(num_indexes - i - 1) / num_indexes for i in range(num_indexes)], dtype=torch.float32), 1).item()
        index = indexes.pop(index)
        selected_population.append(population[index])
    return selected_population

def crossover(parent1, parent2):
    child = LanderSteeringModel(parent1.device).to(device)
    for child_param, parent1_param, parent2_param in zip(child.parameters(), parent1.parameters(), parent2.parameters()):
        child_param.data = (parent1_param.data + parent2_param.data) / 2
    return child

def mutate(model, mutation_rate):
    for param in model.parameters():
        param.data += torch.randn(param.size(), device=device) * mutation_rate
    return model

num_epochs = 100
population_size = 100
population = [LanderSteeringModel(device) for _ in range(population_size)]
for i in range(population_size):
    for param in population[i].parameters():
        param.data = torch.randn(param.size(), device=device)

for epoch in range(num_epochs):
    
    polulation_losses = [evaluate_model(model, simulation, simulation_steps_per_second) for model in population]
    # Sort population by loss
    population = [model for _, model in sorted(zip(polulation_losses, population), key=lambda pair: pair[0])]
    polulation_losses = sorted(polulation_losses)
    print(polulation_losses)

    best_model = population[0]

    print(f"\rEpoch [{epoch + 1}/{num_epochs}], Loss: {polulation_losses[0].item():.4f}", end="")
    if (epoch + 1) % 100 == 0:
        print()

    # Select the best 50% of the population
    selected_population = select_best_models(population, polulation_losses)

    # Crossover randomly selected models
    new_population = []
    for _ in range(2):
        # shuffle the selected population
        selected_population = [selected_population[i] for i in torch.randperm(len(selected_population))]
        for i in range(len(selected_population)):
            parent1 = selected_population[i]
            parent2 = selected_population[(i + 1) % len(selected_population)]
            child = crossover(parent1, parent2)
            new_population.append(child)

    # Mutate the new population
    mutation_rate = 0.1
    new_population = [mutate(model, mutation_rate) for model in new_population]
    population = new_population
    


# save the best model
torch.save(best_model.state_dict(), "model.pth")


[tensor(1.9959), tensor(2.7991), tensor(3.1586), tensor(3.3949), tensor(3.5236), tensor(3.7666), tensor(3.8452), tensor(4.0575), tensor(4.4305), tensor(4.4569), tensor(4.4695), tensor(4.5992), tensor(4.6935), tensor(4.6986), tensor(5.0166), tensor(5.0404), tensor(5.0886), tensor(5.1243), tensor(5.1450), tensor(5.1716), tensor(5.2158), tensor(5.2226), tensor(5.3472), tensor(5.3672), tensor(5.4855), tensor(5.4905), tensor(5.4912), tensor(5.5094), tensor(5.5169), tensor(5.5796), tensor(5.5810), tensor(5.5841), tensor(5.6010), tensor(5.6584), tensor(5.6639), tensor(5.6666), tensor(5.6727), tensor(5.6787), tensor(5.6923), tensor(5.7133), tensor(5.7283), tensor(5.7579), tensor(5.7636), tensor(5.8039), tensor(5.8054), tensor(5.8398), tensor(5.8432), tensor(5.8448), tensor(5.8495), tensor(5.8505), tensor(5.8596), tensor(5.8689), tensor(5.8794), tensor(5.8898), tensor(5.9229), tensor(5.9425), tensor(5.9541), tensor(5.9637), tensor(5.9687), tensor(5.9808), tensor(6.0139), tensor(6.0225), tensor(

KeyboardInterrupt: 

In [51]:
torch.save(best_model.state_dict(), "model.pth")
