# Reinforcement learning

In [None]:
import gymnasium as gym
import numpy as np
# for allowing abstract methodes (closest thing to interface)
from abc import ABC, abstractmethod
from IPython.display import display, clear_output

class Agent(ABC):

    @abstractmethod
    def _init_weights(self):
        pass

    @abstractmethod
    def copy(self):
        pass

    @abstractmethod
    def get_action(self, observation=None):
        pass

    @abstractmethod
    def mutate(parents=None, mutation_rate=None):
        pass

# simulate single training run
def simulate_env(env, agent):
    observation = env.reset()[0]
    done = False

    total_reward = 0

    while not done:
        # Forward pass through the neural network (manueel geschreven)
        action = agent.get_action(observation)

        # Take the selected action and observe the next state and reward
        observation, reward, terminated, truncated, _ = env.step(action)
        total_reward += reward
        done = truncated or terminated

    return total_reward

# train a reinforcment learning agent
def train_agent(env, agent, population_size = 50, mutation_rate=0.4, num_generations = 100, num_episodes=5):

    # Initialize the population
    population = [agent.copy() for _ in range(population_size)]

    # number of generations in the algorithm
    mutation_reduced = False
    best_individual = None
    for generation in range(num_generations):
        scores = []

        # Evaluate each individual in the population
        for current_pop in population:
            total_reward = 0

            # Run multiple episodes to evaluate an individual's performance
            for _ in range(num_episodes):
                total_reward += simulate_env(env, current_pop)

            # Calculate the average score for this individual
            scores.append(total_reward / num_episodes)

        # Select the top-performing individuals
        elite_indices = np.argsort(scores)[-int(0.2 * population_size):]

        # Create a new population by mutating and recombining the elite individuals
        best_individual = population[elite_indices[-1]]
        new_population = [best_individual]  # keep best individual

        while len(new_population) < population_size:
            indices = np.random.choice(elite_indices, size=agent.num_parents)
            parents = [population[index] for index in indices]
            new_population.extend(current_pop.mutate(parents, mutation_rate))

        population = new_population

        # Print the best score in this generation
        best_score = max(scores)
        if best_score > -100 and not mutation_reduced:
            mutation_rate *= 0.1
            mutation_reduced = True
        print(f"Generation {generation + 1}: Best Score = {best_score}")
    
    # return best individual
    return best_individual

## Genetic algorithms

Dit zijn varianten van evolutionaire algoritmes waarbij gebruik gemaakt wordt van crossover van twee ouders

In [None]:
class GeneticAgent(Agent):
    num_parents = 2
    
    def __init__(self, num_inputs=1, num_outputs=1, hidden_layer_sizes=[]) -> None:
        super().__init__()

        self.num_inputs = num_inputs
        self.num_outputs = num_outputs
        self.hidden_layers_sizes = hidden_layer_sizes
        
        self._init_weights()
        
    def _init_weights(self):
        #print(self.hidden_layers_sizes)
        if len(self.hidden_layers_sizes) == 0:
            self.weights = [np.random.randn(self.num_inputs, self.outputs)]
        else:
            self.weights = []

            for index, hidden_layer in enumerate(self.hidden_layers_sizes):
                if index == 0:
                    self.weights.append(np.random.randn(self.num_inputs, hidden_layer))
                else:
                    self.weights.append(np.random.randn(self.hidden_layers_sizes[index-1], hidden_layer))
                
                if index == len(self.hidden_layers_sizes) -1:
                    self.weights.append(np.random.randn(hidden_layer, self.num_outputs))
                
    def copy(self):
        agent = GeneticAgent(self.num_inputs, self.num_outputs, self.hidden_layers_sizes)
        agent._init_weights()

        return agent
        

    def get_action(self, observation=None):

        action_prob = observation

        for index, hidden_layer in enumerate(self.weights):
            if index == len(self.weights)-1:
                # lineaire activatiefunctie
                action_prob = np.dot(action_prob, hidden_layer)
            else:
                # tanh activatiefunctie
                action_prob = np.tanh(np.dot(action_prob, hidden_layer))

        return np.argmax(action_prob)

    def mutate(self, parents=None, mutation_rate=None):
        if not isinstance(parents, list) and not isinstance(parents[0], GeneticAgent) and not isinstance(parents[1], GeneticAgent):
            return
        
        total_size = 0
        for layer in self.weights:
            total_size += layer.size

        # crossover
        crossover_point1 = np.random.randint(0, total_size-1)
        crossover_point2 = np.random.randint(crossover_point1 + 1, total_size)
        
        # flatten weights
        weights_parent1 = np.concatenate([layer.flatten() for layer in parents[0].weights])
        weights_parent2 = np.concatenate([layer.flatten() for layer in parents[1].weights])

        # crossover weights
        child_weights1 = np.concatenate((weights_parent1[:crossover_point1], weights_parent2[crossover_point1:crossover_point2], weights_parent1[crossover_point2:]), axis=0)
        child_weights2 = np.concatenate((weights_parent2[:crossover_point1], weights_parent1[crossover_point1:crossover_point2], weights_parent2[crossover_point2:]), axis=0)

        # recreate shapes
        stop_point = 0
        child1 = parents[0].copy()
        child2 = parents[1].copy()

        for index, layer in enumerate(self.hidden_layers_sizes):
            if index == 0:
                size = self.num_inputs * layer
            else:
                size = self.hidden_layers_sizes[index-1]

            target_shape = parents[0].weights[0].shape
            child1.weights[index] = child_weights1[stop_point:stop_point+size].reshape(target_shape)
            child2.weights[index] = child_weights2[stop_point:stop_point+size].reshape(target_shape)

            stop_point += size
        
        return [child1, child2]
        

In [None]:
# Define the MountainCar environment
env = gym.make("MountainCar-v0")

# Hyperparameters
population_size = 100
mutation_rate = 0.4
num_generations = 100
num_episodes = 5

# RL agent with internally a NN with a hidden layer of 8 neurons
input_size = env.observation_space.shape[0]
output_size = env.action_space.n
agent = GeneticAgent(num_inputs=input_size, num_outputs=output_size, hidden_layer_sizes=[8])

best_genetic_agent = train_agent(env, agent, population_size=population_size, mutation_rate=mutation_rate, num_generations=num_generations, num_episodes=num_episodes)

In [None]:
# Evaluate the best individual
env = gym.make("MountainCar-v0", render_mode="human")

for episode in range(5):
    score = simulate_env(env, best_genetic_agent)
    print(f"Best Individual Score: {score}")

In [None]:
import matplotlib.pyplot as plt

def plot_action_space(agent):

    results = []
    xs = np.arange(-1.2, 0.6, 0.05)
    ys = np.arange(-0.07, 0.07, 0.001)

    for x in xs:
        tmp = []
        for y in ys:
            tmp.append(agent.get_action(np.array([x, y])))
        results.append(tmp)
    results = np.array(results)
    
    plt.figure(figsize=(8, 12))
    plt.imshow(results, cmap='gray', interpolation='none', extent=[xs[0], xs[-1], ys[0], ys[-1]])

    # Add x and y ticks with labels
    plt.xticks(np.linspace(xs[0], xs[-1], num=10), rotation=45)  # Set x-axis ticks
    plt.yticks(np.linspace(ys[0], ys[-1], num=10))                # Set y-axis ticks
    
    # Label the axes
    plt.xlabel('position')
    plt.ylabel('velocity')

plot_action_space(best_genetic_agent)

## Met pytorch

Pas het genetisch algoritme in voorgaande code aan zodat er gebruik gemaakt wordt van pytorch inplaats van de berekeningen manueel met wiskunde operaties te doen.

In [None]:
# create agent met klassenaam GeneticTFAgent
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

class GeneticTorchAgent(Agent):
    num_parents = 2
    
    def __init__(self, num_inputs=1, num_outputs=1, hidden_layer_sizes=[]) -> None:
        super().__init__()

        self.num_inputs = num_inputs
        self.num_outputs = num_outputs
        self.hidden_layers_sizes = hidden_layer_sizes
        
        self._init_weights()
        
    def _init_weights(self):

        layers = []
        input_size = self.num_inputs

        for hidden_size in self.hidden_layers_sizes:
            layers.append(nn.Linear(input_size, hidden_size))
            layers.append(nn.ReLU())
            input_size = hidden_size

        layers.append(nn.Linear(input_size, self.num_outputs))  # Output layer
        
        self.model = nn.Sequential(*layers)
                
    def copy(self):
        agent = GeneticTorchAgent(self.num_inputs, self.num_outputs, self.hidden_layers_sizes)
        agent.model.load_state_dict(self.model.state_dict())  # Copy weights
        return agent
        

    def get_action(self, observation=None):
        with torch.no_grad():
            observation_tensor = torch.FloatTensor(observation).unsqueeze(0)  # Add batch dimension
            action_prob = self.model(observation_tensor)
        return torch.argmax(action_prob).item()

    def mutate(self, parents=None, mutation_rate=None):
        if not isinstance(parents, list) and not isinstance(parents[0], GeneticTFAgent) and not isinstance(parents[1], GeneticTFAgent):
            return
        
        child1 = parents[0].copy()
        child2 = parents[1].copy()
        
        # Get the weights of the parents and children
        parent1_weights = np.concatenate([param.detach().cpu().numpy().flatten() for param in parents[0].model.parameters()])
        parent2_weights = np.concatenate([param.detach().cpu().numpy().flatten() for param in parents[1].model.parameters()])

        # Perform two-point crossover on weights
        crossover_point1 = np.random.randint(0, len(parent1_weights)-1)
        crossover_point2 = np.random.randint(crossover_point1 + 1, len(parent1_weights))

        # crossover weights
        child_weights1 = np.concatenate((parent1_weights[:crossover_point1], parent2_weights[crossover_point1:crossover_point2], parent1_weights[crossover_point2:]), axis=0)
        child_weights2 = np.concatenate((parent2_weights[:crossover_point1], parent1_weights[crossover_point1:crossover_point2], parent2_weights[crossover_point2:]), axis=0)
        
        # mutation
        child_weights1 += mutation_rate * np.random.randn(*child_weights1.shape)
        child_weights2 += mutation_rate * np.random.randn(*child_weights2.shape)

        # recreate shapes
        child1_reshaped_weights = []
        child2_reshaped_weights = []
        idx = 0
        for layer in parent1_weights:
            size = layer.size
            child1_reshaped_weights.append(child_weights1[idx:idx+size].reshape(layer.shape))
            child2_reshaped_weights.append(child_weights2[idx:idx+size].reshape(layer.shape))
            idx += size

        # Set the weights back to the children
        with torch.no_grad():
            for child1_param, child2_param, new_w1, new_w2 in zip(child1.model.parameters(), child2.model.parameters(), child1_reshaped_weights, child2_reshaped_weights):
                child1_param.copy_(torch.from_numpy(new_w1))
                child2_param.copy_(torch.from_numpy(new_w2))
        
        return [child1, child2]
        

In [None]:
# Define the MountainCar environment
env = gym.make("MountainCar-v0")

# Hyperparameters
population_size = 100
mutation_rate = 0.4
num_generations = 100
num_episodes = 5

# RL agent with internally a NN with a hidden layer of 8 neurons
input_size = env.observation_space.shape[0]
output_size = env.action_space.n
agent = GeneticTorchAgent(num_inputs=input_size, num_outputs=output_size, hidden_layer_sizes=[8])

best_genetictorch_agent = train_agent(env, agent, population_size=population_size, mutation_rate=mutation_rate, num_generations=num_generations, num_episodes=num_episodes)

In [None]:
# Evaluate the best individual
env = gym.make("MountainCar-v0", render_mode="human")

for episode in range(5):
    score = simulate_env(env, best_genetictorch_agent)
    print(f"Best Individual Score: {score}")

In [None]:
plot_action_space(best_genetictf_agent)