In [1]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

from models.model2 import BicycleModel
from models.model4 import FourWheelModel
from models.utils import get_csv_row_count
from models.utils import get_folder_path
from models.utils import fit_circle
from models.utils import plot_column_histograms

In [27]:

# 1 particle contains the values of T_peak and T_front
class Particle:
    def __init__(self, position, velocity):
        self.position = position
        self.velocity = velocity
        self.best_position = position # position =  parameters we want to infer
        
        self.position_cost_value = self.compute_fitness_function()
        self.best_position_cost_value =  self.position_cost_value



    def update_velocity(self, inertia, cognitive_rate, social_rate, global_best_position):
        r1, r2 = np.random.rand(2)
        cognitive_component = cognitive_rate * r1 * (self.best_position - self.position)
        social_component = social_rate * r2 * (global_best_position - self.position)
        self.velocity = inertia * self.velocity + cognitive_component + social_component

    def update_position(self, lower_bounds, upper_bounds):
        self.position += self.velocity
        self.position = np.clip(self.position, lower_bounds, upper_bounds)

        self.position_cost_value = self.compute_fitness_function(self.position)
    
    # This function is called after update_position
    def update_best_position(self, objective_function):
        if self.position_cost_value < self.best_position_cost_value:
            self.best_position = self.position
            self.best_position_cost_value = self.compute_fitness_function(self.position)

    def compute_fitness_function(self):
        return float(self.position[0]**2 + self.position[1]**2)
        


class PSO:
    def __init__(self, num_particles, max_iterations, lower_bounds, upper_bounds, dimension):
        self.num_particles = num_particles
        self.max_iterations = max_iterations
        self.lower_bounds = lower_bounds
        self.upper_bounds = upper_bounds
        self.dimension = dimension
        self.param_to_infer = 2
        self.particles = []
        self.global_best_position = None
        self.particle_positions_history = []
        self.particle_fitness_history = []

        # Related to vehicle dynamics
        self.model = None
        self.dt = 0.025
        self.commands = get_folder_path()+'/open_loop_inputs/open_loop_commands.csv'
        self.N = get_csv_row_count(self.commands)
        self.open_loop_tf = self.N*self.dt
        # T_peak and T_slope is not specified since we infer those
        self.x_vect_ref = self.get_x_vect_ref()
        self.y_likelihood = 0


    def get_x_vect_ref(self):
        T_peak_ref = 0.37
        T_slope_ref = 0.4
        self.model = FourWheelModel(self.dt,self.open_loop_tf, T_peak_ref, T_slope_ref)
        t,x = self.get_open_loop_data()
        return x
    
    # Example usage
    def get_kpi(self, particle_position):
        self.generate_new_model(particle_position[:self.param_to_infer])
        t,x_vect = self.get_open_loop_data()
        
        l2_norm_position = np.linalg.norm(self.x_vect_ref[:,:2] - x_vect[:,:2], axis=1).sum()
        l2_norm_velocity = np.linalg.norm(self.x_vect_ref[:,2:] - x_vect[:,2:], axis=1).sum()

        w1 = 0.5
        w2 = 0.5
        kpi = w1*l2_norm_position+w2*l2_norm_velocity
        return kpi


    def get_open_loop_data(self):
        
        t,x_vect = self.model.do_open_loop_sim_from_csv(self.commands)
        
        x = x_vect[:,0]
        y = x_vect[:,1]

        #plt.plot(x,y)
        #plt.axis('equal')
        
        return t,x_vect


    def get_circle_radius(self, particle):
        '''
            Gets the radius of  the circle from the open loop simulation
            @param name_of-param
        '''
        # Generate the right tyre1000.yaml file (Let's call it 1000 by convention)
        self.generate_new_model(particle[:self.param_to_infer])
        
        # Generate the new data
        t0 = 0
        N = self.model.open_loop_tf/self.model.dt
        steering = 0.2
        torque = 100
        inputs = [steering, torque, torque, torque, torque]
        t,x_vect = self.model.do_open_loop_sim_cst_inputs(t0, inputs)
        
        # Fetch the radius from the data
        x = x_vect[int(N/2):,0]
        y = x_vect[int(N/2):,1]
        radius = fit_circle(x, y) 
        
        return radius
    
    def generate_new_model(self, particle):
        T_peak = particle[0]
        T_slope = particle[1]
        self.model = FourWheelModel( self.dt, self.open_loop_tf,float(T_peak), float(T_slope))

    def optimize(self, inertia=0.5, cognitive_rate=0.5, social_rate=0.5):
        self.initialize_particles()

        for i in range(self.max_iterations):
            self.particle_positions_history.append([particle.position for particle in self.particles])
            self.particle_fitness_history.append([self.objective_function(particle.position) for particle in self.particles])

            for particle in self.particles:
                particle.update_velocity(inertia, cognitive_rate, social_rate, self.global_best_position)
                particle.update_position(self.lower_bounds, self.upper_bounds)
                particle.update_best_position(self.objective_function)

            self.update_global_best_position()
            #if(i%10 == 0):
            print('iteration', i)

    def initialize_particles(self):
        self.particles = []
        for _ in range(self.num_particles):
            position = np.random.uniform(self.lower_bounds, self.upper_bounds, size=self.dimension)
            velocity = np.zeros_like(position)
            particle = Particle(position, velocity)
            self.particles.append(particle)

        self.global_best_position = self.particles[np.argmin([self.objective_function(particle.position) for particle in self.particles])].position

    def update_global_best_position(self):
        for particle in self.particles:
            if self.objective_function(particle.position) < self.objective_function(self.global_best_position):
                self.global_best_position = particle.position

    def plot_particle_positions(self, frame):
        plt.cla()
        positions = np.array(self.particle_positions_history[frame])
        for i in range(self.dimension):
            plt.plot(positions[:, i], np.zeros_like(positions[:, i]), 'go', markersize=6)
        plt.xlabel('x')
        plt.ylim([-0.5, 0.5])

    def plot_particle_fitness_history(self):
        fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(12, 4))

        # Plot histogram at the start
        ax1.hist(self.particle_fitness_history[0], bins=20, color='blue', edgecolor='black')
        ax1.set_xlabel('Fitness')
        ax1.set_ylabel('Count')
        ax1.set_title('Start of Optimization')

        # Plot histogram at the middle
        middle_index = len(self.particle_fitness_history) // 2
        ax2.hist(self.particle_fitness_history[middle_index], bins=20, color='green', edgecolor='black')
        ax2.set_xlabel('Fitness')
        ax2.set_ylabel('Count')
        ax2.set_title('Middle of Optimization')

        # Plot histogram at the end
        ax3.hist(self.particle_fitness_history[-1], bins=20, color='red', edgecolor='black')
        ax3.set_xlabel('Fitness')
        ax3.set_ylabel('Count')
        ax3.set_title('End of Optimization')

        plt.tight_layout()
        plt.show()

    def plot_particle_history_2d(self):
        if self.dimension == 2:
            self.particle_positions_history = np.array(self.particle_positions_history)
            fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(12, 4))
            # Plot histogram at the start
            ax1.hist2d(self.particle_positions_history[0][:, 0], self.particle_positions_history[0][:, 1], bins=20, cmap='viridis')
            ax1.set_xlabel('x')
            ax1.set_ylabel('y')
            ax1.set_title('Start of Optimization')

            # Plot histogram at the middle
            middle_index = len(self.particle_positions_history) // 2
            ax2.hist2d(self.particle_positions_history[middle_index][:, 0],self.particle_positions_history[middle_index][:, 1], bins=20, cmap='viridis')
            ax2.set_xlabel('x')
            ax2.set_ylabel('y')
            ax2.set_title('Middle of Optimization')

            # Plot histogram at the end
            ax3.hist2d(self.particle_positions_history[-1][:, 0], self.particle_positions_history[-1][:, 1], bins=20, cmap='viridis')
            ax3.set_xlabel('x')
            ax3.set_ylabel('y')
            ax3.set_title('End of Optimization')

            plt.tight_layout()
            plt.show()

    def plot_particle_positions_animate(self):
        fig = plt.figure()
        ani = FuncAnimation(fig, self.plot_particle_positions, frames=len(self.particle_positions_history), interval=200)
        plt.show()

    def objective_function(self, position):
        print('objective function called')
        if self.dimension == 1:
            return(self.objective_function_1d(position))
        elif self.dimension == 2:
            return(self.get_kpi(position))
        


    def objective_function_1d(self, particle_position):
        return float(particle_position**2)
    
    def objective_function_2d(self, particle_position):
        return float(particle_position[0]**2 + particle_position[1]**2)


In [28]:
# Example usage
pso = PSO(num_particles=1, max_iterations=100, lower_bounds=0, upper_bounds=1, dimension=2)
pso.optimize()
#pso.plot_particle_positions_animate()
pso.plot_particle_fitness_history()
pso.plot_particle_history_2d()

objective function called
objective function called
objective function called
objective function called
objective function called
objective function called
iteration 0
objective function called
objective function called
objective function called
objective function called
objective function called
iteration 1
objective function called
objective function called
objective function called
objective function called
objective function called
iteration 2
objective function called
objective function called
objective function called
objective function called
objective function called
iteration 3
objective function called
objective function called
objective function called
objective function called
objective function called
iteration 4
objective function called
objective function called
objective function called
objective function called
objective function called
iteration 5
objective function called
objective function called
objective function called
objective function called
objective function

KeyboardInterrupt: 