Imports and warning supression

In [1]:
import pygame
import numpy as np
import tensorflow as tf  # Import TensorFlow
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from collections import deque
import random
import os
from datetime import datetime

# Suppress TensorFlow INFO and WARNING messages
tf.get_logger().setLevel('ERROR')

pygame 2.5.2 (SDL 2.28.2, Python 3.10.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


2023-11-14 21:26:51.600568: I tensorflow/core/util/port.cc:111] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-11-14 21:26:51.602598: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-11-14 21:26:51.628191: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-11-14 21:26:51.628221: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-11-14 21:26:51.628246: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to regi

Initial setup and parameter setting

This handles preliminary display overhead as well as RL hyperparameters for tuning

In [2]:
# Screen dimensions
WIDTH, HEIGHT = 800, 600

visualize = True

if visualize:
    # Colors
    WHITE = (255, 255, 255)
    RED = (255, 0, 0)
    GREEN = (0, 255, 0)
    BLUE = (0, 0, 255)
    BLACK = (0, 0, 0)

    # Initialize pygame
    pygame.init()
    # Create the screen and clock objects
    screen = pygame.display.set_mode((WIDTH, HEIGHT))
    pygame.display.set_caption('Simulation Interation: 0')
    clock = pygame.time.Clock()

# Object and target settings
object_radius = 15
target_radius = 10
contact_distance = object_radius

# Initial positions
object_pos = np.array([WIDTH // 2, HEIGHT // 2], dtype=float)
target_pos = np.array([WIDTH // 2, HEIGHT //4])

# Exploration parameters
epsilon = 1.0  # Exploration rate
epsilon_min = 0.01  # Minimum exploration probability
epsilon_decay = 0.995  # Exponential decay rate for exploration prob

# Hyperparameters
n_particles = 10
friction_coefficient = -0.05
state_size = 4 + 4 + 2  # position and velocity for each particle + object position and velocity + target position
action_size = 2  # 2D force vector for each particle
learning_rate = 0.005
gamma = 0.99  # Discount factor for future rewards
action_selection_frequency = 2  # Number of frames to wait before selecting a new action
frame_counter = 0  # Counter to keep track of frames
collision_occurred = False
initial_force_magnitude = 10.0  # Adjust the magnitude of the initial force as needed

# RL batch training
batch_size = 16
max_success_frames = 600 #frames

RL methods used to train the model

In [3]:
# Define the neural network for RL
for filename in os.listdir(os.getcwd()):
    if filename.endswith(".keras"):
        model = tf.keras.models.load_model(f'{filename}')
        print(f'Using model: {filename}')
    else:
        model = Sequential([
            Dense(64, activation='relu', input_shape=(state_size,)),
            Dense(64, activation='relu'),
            Dense(action_size, activation='tanh')  # Force vector in range [-1, 1]
        ])
        model.compile(loss='mse', optimizer=Adam(learning_rate))

# Function to extract the current state
def get_state(particle, object, target_pos):
    particle_state = np.concatenate([particle.position, particle.velocity])
    object_state = np.concatenate([object.position, object.velocity])
    state = np.concatenate([particle_state, object_state, target_pos])
    return state

def apply_action(action, particle):
    particle.force = action  # Apply force to the particle

def calculate_individual_reward(particle, object, target_pos, collision_occurred_with_object, starting_distance_to_target):
    # Current distance between object and target
    distance_from_object_to_target = np.linalg.norm(object.position - target_pos)
    #change in disctnace between object and target
    delta_distance_to_target = starting_distance_to_target - distance_from_object_to_target
    #setting the new distance as the old distance to recalculate for the next loop
 
    #if delta_distance_to_target is negative, we are closer to the target and want to reward our model
    reward = (delta_distance_to_target)*100 * collision_occurred_with_object

    # Penalty for wall collisions
    if particle.hit_wall:
        reward -= 50  # Adjust the penalty value as needed

    # Reward for collision with object
    if collision_occurred_with_object:
        reward += 100  # Adjust the reward value as needed

    return reward

def reset_simulation(particle_list, object, sim_iter):
    object.position = np.random.rand(2) * [WIDTH, HEIGHT]
    object.velocity = np.zeros_like(object.velocity)
    for particle in particle_list:
        particle.position = np.random.rand(2) * [WIDTH, HEIGHT]
        particle.velocity = np.zeros_like(particle.velocity)

    if visualize:
        pygame.display.set_caption(f'Simulation Interation: {sim_iter}')
        # Clear the Pygame event queue to avoid processing stale events
        pygame.event.clear()

    steps = 0
    print(f'--- Simulation Interation #{sim_iter} ---')
    sim_iter+=1
    starting_distance_to_target = np.linalg.norm(object.position - target_pos)

    return steps, sim_iter, starting_distance_to_target

class ReplayBuffer:
    def __init__(self, capacity=10000):
        self.capacity = capacity
        self.buffer = deque(maxlen=capacity)
    
    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def size(self):
        return len(self.buffer)
    
    def __str__(self):
        buffer_contents = ', '.join([str(item) for item in list(self.buffer)])
        return f"ReplayBuffer (Size: {self.size()}/{self.capacity}) Contents: [{buffer_contents}]"

def train_model(model, replay_buffers, batch_size, gamma):
    # Train only if all buffers have enough samples
    if all([buffer.size() >= batch_size for buffer in replay_buffers]):
        # Sample from each buffer and train
        now = datetime.now()
        current_time = now.strftime("%H:%M:%S")
        print("Training @ =", current_time)
        for buffer in replay_buffers:
            minibatch = buffer.sample(batch_size)
            for state, action, reward, next_state, done in minibatch:
                # print('model.predit and model.fit')
                target = reward
                if not done:
                    target = (reward + gamma * np.amax(model.predict(next_state.reshape(1, -1), verbose = 0)[0]))
                target_f = model.predict(state.reshape(1, -1), verbose = 0)
                target_f[0][np.argmax(action)] = target
                model.fit(state.reshape(1, -1), target_f, epochs=1, verbose = 0)  

def train_model_simple(model, current_state, action, reward, next_state, done, gamma):
    # Predict the future reward from the next state
    target = reward
    if not done:
        target = reward + gamma * np.amax(model.predict(next_state.reshape(1, -1))[0])

    # Get the current prediction for all actions in the current state
    target_f = model.predict(current_state.reshape(1, -1))

    # Update the target for the action taken
    target_f[0][np.argmax(action)] = target

    # Train the model with the new target
    model.fit(current_state.reshape(1, -1), target_f, epochs=1, verbose=0)


Using model: model_p10.keras


Physics and simulation setup

In [4]:

# Class definition for particles
class particle:
    def __init__(self, mass=1.0, position=np.array([0.0, 0.0]), radius=5.0, velocity=np.array([0.0, 0.0]), force=np.array([0.0, 0.0])):
        self.position = position.astype(float)
        self.force = force.astype(float)
        self.radius = float(radius)
        self.velocity = velocity.astype(float)
        self.mass = float(mass)
        self.hit_wall = False
        
    def physics_move(self):
        self.hit_wall = False
        # Collision with boundaries and physics updates...
        # Collision with left or right boundary
        if self.position[0] - self.radius < 0 or self.position[0] + self.radius > WIDTH:
            self.velocity[0] = -self.velocity[0]
            self.position[0] = np.clip(self.position[0], self.radius, WIDTH - self.radius)
            self.hit_wall = True
        if self.position[1] - self.radius < 0 or self.position[1] + self.radius > HEIGHT:
            self.velocity[1] = -self.velocity[1]
            self.position[1] = np.clip(self.position[1], self.radius, HEIGHT - self.radius)
            self.hit_wall = True
            
        # Calculate acceleration from force
        acceleration = self.force / self.mass

        # Update velocity with acceleration
        self.velocity += acceleration

        # Apply friction to the velocity
        self.velocity += friction_coefficient * self.velocity

        if np.linalg.norm(self.velocity) < 0.05:
            self.velocity = np.zeros_like(self.velocity)

        # Update position with velocity
        self.position += self.velocity

# Helper function to check if a collision occurs between two objects
def is_collision(particle1, particle2):
    distance = np.linalg.norm(particle1.position - particle2.position)
    return distance < (particle1.radius + particle2.radius)

def handle_collisions(particles, restitution_coefficient=1):
    n = len(particles)
    for i in range(n):
        for j in range(i + 1, n):
            particle1, particle2 = particles[i], particles[j]
            distance_vector = particle1.position - particle2.position
            distance = np.linalg.norm(distance_vector).astype(float)
            if distance < (particle1.radius + particle2.radius):
                # Normalize distance_vector to get collision direction
                collision_direction = (distance_vector / distance)
                total_mass = float(particle1.mass + particle2.mass)
               
                overlap = float((particle1.radius + particle2.radius) - distance)
                particle1.position += (overlap * (particle2.mass / total_mass)) * collision_direction
                particle2.position -= (overlap * (particle1.mass / total_mass)) * collision_direction

                # Calculate relative velocity
                if distance_vector[0] > 0 or distance_vector[0] > 0:
                    relative_velocity = particle2.velocity - particle1.velocity
                else:
                    relative_velocity = particle1.velocity - particle2.velocity

                # Calculate velocity along the direction of collision
                velocity_along_collision = np.dot(relative_velocity, collision_direction)
                
                # Only proceed to update velocities if particles are moving towards each other
                if velocity_along_collision > 0:
                    # Apply the collision impulse
                    mass_factor = (2 * restitution_coefficient) / total_mass
                    impulse = velocity_along_collision * collision_direction * mass_factor
                    particle1.velocity += impulse * particle2.mass
                    particle2.velocity -= impulse * particle1.mass

Initialization of necessary objects

In [5]:

# Initialize particle list and object
# Initialize particle list with initial force towards the object
particle_list = []
for _ in range(n_particles):
    # Random position for each particle
    position = np.random.rand(2) * [WIDTH, HEIGHT]

    # Direction from particle to object
    direction_to_object = object_pos - position
    direction_to_object /= np.linalg.norm(direction_to_object)  # Normalize the direction

    # Set initial force towards the object
    initial_force = direction_to_object * initial_force_magnitude

    # Create particle with initial force
    new_particle = particle(mass=10, position=position, velocity=np.random.rand(2), force=initial_force)
    particle_list.append(new_particle)
object = particle(position=object_pos, radius=object_radius, mass=5)

collision_occurred_with_object = False

#Initialize replay buffer
replay_buffers = [ReplayBuffer(capacity=50000) for _ in range(n_particles)]

# Initialize last chosen action
last_action = np.zeros(action_size)

# Define the maximum duration for a successful run (in milliseconds)
consecutive_successes = 0

# Initialize previous_distance_to_target
starting_distance_to_target = np.linalg.norm(object.position - target_pos)

# Main simulation loop
running = True
frames = 0
sim_iter = 1

Training/simulation loop

In [6]:
while running:
    frames += 1
    if visualize:
    # Clear the screen and render the simulation
        screen.fill(WHITE)

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False

    # Handle collisions and move particles
    handle_collisions(particle_list + [object])

    # Gather all current states
    current_states = np.array([get_state(particle, object, target_pos) for particle in particle_list])

    # Batch prediction
    actions = model.predict(current_states, verbose=0)

    for particle_index, particle in enumerate(particle_list):
        action = actions[particle_index][:2]  # Assuming the model returns the correct shape
        apply_action(action, particle)
        particle.physics_move()  # Update physics of this particle

        # Calculate reward for this particle
        state = get_state(particle, object, target_pos)
        done = np.linalg.norm(object.position - target_pos) < (object_radius + target_radius)
        reward = calculate_individual_reward(particle, object, target_pos, collision_occurred_with_object, starting_distance_to_target)

        # Add experience to the respective particle's replay buffer
        replay_buffers[particle_index].add(current_states[particle_index], actions[particle_index], reward, state, done)

    # Update object's physics
    object.physics_move()

    # Decay epsilon
    epsilon = max(epsilon_min, epsilon_decay * epsilon)

    if visualize:
        pygame.display.set_caption(f'Simulation Interation: {sim_iter}    Frame: {frames}')
        pygame.draw.rect(screen, BLACK, (0, 0, WIDTH, HEIGHT), 2)
        pygame.draw.circle(screen, BLUE, center=(object.position[0], object.position[1]), radius=object.radius)
        pygame.draw.circle(screen, GREEN, target_pos.astype(int), target_radius)

        for particle in particle_list:
            pygame.draw.circle(screen, RED, center=(particle.position[0], particle.position[1]), radius=particle.radius)
        
        pygame.display.flip()
        clock.tick(60)

    
    if done:
        consecutive_successes += 1
        if consecutive_successes >= 3:
            print("Model training completed.")
            model.save('particle_swarm_model.h5')
            running = False
        
        #Train model with accumulated experiences
        train_model(model, replay_buffers, batch_size, gamma)
        #Reset for new session
        print("Hey! That worked! Let's do it again!!")
        frames, sim_iter, starting_distance_to_target = reset_simulation(particle_list, object, sim_iter)
        
    elif frames >= max_success_frames:
        print('That didnt quite work... lets try again.')
        consecutive_successes = 0  # Reset if the task was not completed in time
        #Train model with accumulated experiences
        train_model(model, replay_buffers, batch_size, gamma)
        #Reset for new session
        frames, sim_iter, starting_distance_to_target = reset_simulation(particle_list, object, sim_iter)

    if sim_iter > 10:
        model.save('model_p10.keras')
        print("Model training completed.")
        running = False

if visualize:
    pygame.quit()


That didnt quite work... lets try again.
ReplayBuffer (Size: 600/50000) Contents: [(array([ 77.02756932, 185.07494474,   0.86462632,   0.88231288,
       400.        , 300.        ,   0.        ,   0.        ,
       400.        , 150.        ]), array([-1.,  1.], dtype=float32), 0.0, array([ 77.75396432, 186.00814198,   0.726395  ,   0.93319724,
       400.        , 300.        ,   0.        ,   0.        ,
       400.        , 150.        ]), False), (array([ 77.75396432, 186.00814198,   0.726395  ,   0.93319724,
       400.        , 300.        ,   0.        ,   0.        ,
       400.        , 150.        ]), array([-1.,  1.], dtype=float32), 0.0, array([ 78.34903957, 186.98967936,   0.59507525,   0.98153738,
       400.        , 300.        ,   0.        ,   0.        ,
       400.        , 150.        ]), False), (array([ 78.34903957, 186.98967936,   0.59507525,   0.98153738,
       400.        , 300.        ,   0.        ,   0.        ,
       400.        , 150.        ]), array