In [1]:
from gym import Env
import gymnasium as gym
from gymnasium import spaces
from gym.spaces import Discrete, Box
import numpy as np
import random
import pygame

Environment

In [11]:
class TrainEnv(Env):
    metadata = {'render.modes': ['human']}
    
    def __init__(self):
        # Actions: accelerate, slow down, keep speed
        self.action_space = Discrete(3)
        # Store current speed, position x, position y, distance left
        self.observation_space = Box(low=np.array([0.0, 0.0, 0.0, 0.0]), high=np.array([1.0, 1000.0, 768.0, 20000.0]))

        # Initial state: [speed, x, y, distance_left]
        self.state = [0.0, 118.0, 110.0, 767.0]
        self.target = np.array([885, 110])

        # Initialize variables for Pygame
        self.screen = None
        self.clock = None
        self.is_pygame_initialized = False  # Flag to check if Pygame is initialized

        self.init_pygame()

    def init_pygame(self):
        print("Initializing Pygame...")
        pygame.init() 
        self.screen = pygame.display.set_mode((1000, 768))
        self.clock = pygame.time.Clock()
        self.is_pygame_initialized = True

        # Initialize font
        pygame.font.init()
        self.font = pygame.font.Font(None, 36)

    def step(self, action):
        reward = 0
        speed, x, y, distance_left = self.state
        current_position = np.array([x, y])
        direction = self.target - current_position
        distance_to_target = np.linalg.norm(direction)
        
        # Constants
        max_speed = 1.0  # Maximum speed
        max_acceleration = 0.005  # Maximum speed increase per step
        max_deceleration = 0.005  # Maximum speed decrease per step

        # Action: 0 = accelerate, 1 = decelerate, 2 = maintain speed
        if action == 0:  # Accelerate
            speed = min(max_speed, speed + max_acceleration)
        elif action == 1:  # Decelerate
            speed = max(0.0, speed - max_deceleration)
        # maintain speed requires no change

        # Move the train
        direction_unit = direction / distance_to_target if distance_to_target > 0 else np.array([0, 0])
        new_position = current_position + speed * direction_unit
        self.state = [speed, new_position[0], new_position[1], distance_to_target]

        if speed > 0.9 * max_speed:
            reward += 0.1  # Reward for maintaining high speed
        else:
            reward -= 0.1  # Time penalty for taking too long
        if distance_to_target < 5:
            if speed < 0.1:
                reward += 100
            else:
                reward -= 100
            done = True
        else:
            done = False

        return self.state, reward, done, {}

    def reset(self):
        self.state = [0.0, 118.0, 110.0, 767.0]
        return self.state

    def render(self, mode='human'):
        # Clear the screen
        self.screen.fill((0, 0, 0))

        # Draw the target
        pygame.draw.circle(self.screen, (255, 0, 0), (int(self.target[0]), int(self.target[1])), 10)

        # Draw the train (as a small circle)
        train_position = (int(self.state[1]), int(self.state[2]))
        pygame.draw.circle(self.screen, (0, 255, 0), train_position, 10)

        speed_text = self.font.render(f"Speed: {self.state[0]:.2f}", True, (255, 255, 255))
        # Blit the text onto the screen
        self.screen.blit(speed_text, (10, 10))  

        # Update the display
        pygame.display.flip()

        # Cap the frame rate
        self.clock.tick(60)


    def close(self):
        if self.is_pygame_initialized:
            print("Closing Pygame...")
            pygame.quit()
            self.is_pygame_initialized = False



Model

In [3]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from keras.optimizers import Adam  # Change made here
from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory

# Environment test
env = TrainEnv()

states = env.observation_space.shape[0]
actions = env.action_space.n

model = Sequential()
model.add(Flatten(input_shape=(1, states)))
model.add(Dense(24, activation="relu"))
model.add(Dense(24, activation="relu"))
model.add(Dense(actions, activation="linear"))

agent = DQNAgent(
    model=model,
    memory=SequentialMemory(limit=50000, window_length=1),
    nb_actions=actions,
    nb_steps_warmup=10,
    target_model_update=0.01,
    enable_double_dqn=True
)
agent.compile(Adam(learning_rate=0.001), metrics=["mae"]) 

  import sre_constants
  from jax import xla_computation as _xla_computation
  import cgi
  from urllib3.contrib.pyopenssl import orig_util_SSLContext as SSLContext
  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


Initializing Pygame...


Run only for training

In [None]:
agent.fit(env, nb_steps=100000, visualize=False, verbose=1)
# results = agent.test(env, nb_episodes=10, visualize=False)
# print(np.mean(results.history))

# Save the model weights after training
agent.save_weights('dqn_trainenv_weights.h5f', overwrite=True)

Render and Visualization

In [13]:
# Assuming the agent has been trained and saved
agent.load_weights('dqn_trainenv_weights.h5f')
env = TrainEnv()
# Visualize one episode with the trained agent
state = env.reset()
done = False
total_reward = 0

while not done:

    for event in pygame.event.get():
        if event.type == pygame.QUIT or (event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE):
            print("Quitting Pygame...")
            env.close()  # Call close method to quit Pygame and stop the simulation
            done = True
            break

    if done:  # Break the loop if quitting was triggered
        break
                
    action = agent.forward(state)
    state, reward, done, _ = env.step(action)

    # Render the environment with Pygame
    env.render()

    total_reward += reward

print(f"Total reward: {total_reward}")
env.close()  # Ensure to close Pygame when done


Initializing Pygame...
Quitting Pygame...
Closing Pygame...
Total reward: -7.499999999999989
