In [1]:
from __future__ import annotations

from collections import defaultdict

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from matplotlib.patches import Patch
import matplotlib.patches as patches
from tqdm import tqdm
# import pygame
import torch
import gym
from gym import spaces
from gym.envs.registration import register
import numpy as np
import random

In [2]:
# create a class for the action
from pyclbr import Class


class Action:
    def __init__(self, name, delta):
        self.name = name
        self.delta = delta

class Agent:
    def __init__(self, index, coordinates, status):
        self.index = index
        self.coordinates = coordinates
        self.status = status
        self.terminate = False
        self.truncated = False
        self.reward = 0
        self.volume = 0  # initialize volume to 0
        self.sawlog_volume = 0  # initialize sawlog volume to 0
        self.pulplog_volume = 0  # initialize pulplog volume to 0
        
    
    def move(self, delta, grid_size):
        # Calculate the new position
        new_coordinates = tuple(sum(x) for x in zip(self.coordinates, delta))

        # Check if the new position is within the grid
        if 0 <= new_coordinates[0] < grid_size and 0 <= new_coordinates[1] < grid_size:
            # If it is, move the agent to the new position
            self.coordinates = new_coordinates
        else:
            # If it's not, move the agent to the edge of the grid
            self.coordinates = (max(0, min(grid_size - 1, new_coordinates[0])),
                                max(0, min(grid_size - 1, new_coordinates[1])))
    def agent_load(self):
        self.volume += 10  # increase volume by 10 when loading

    
    def logging_loading(self):
        # create a random loading function, if the random number is greater than 0.25 (75% to load sawlog), the agent will load sawlog, otherwise, it will load pulplog
        if random.random() > 0.75:
            self.sawlog_volume += 10
            self.volume += 10
        else:
            self.pulplog_volume += 10
            self.volume += 10
    
    def agent_unload(self):
        self.volume -= 10 # decrease volume by 10 when unloading

    def update_status(self, new_status):
        self.status = new_status

    def __repr__(self):
        return f"Agent(location={self.location}, stats={self.status})"

# create a class of location - 3 types of location, log site, sort site, and mill, each location has a reward


class Location:
    def __init__(self, location_type, reward, coordinates, location_id):
        self.location_type = location_type
        self.reward = reward
        self.coordinates = coordinates  # tuple of (x, y)
        self.location_id = location_id  # unique numerical identifier
        self.volume = 0  # initialize volume to 0


    def __repr__(self):
        return f"Location(location_type={self.location_type}, reward={self.reward}, coordinates={self.coordinates}, location_id={self.location_id})"
    
    def logging(self):
        self.volume += 10  # increase volume by 10 when logging

    def site_loading(self):
        self.volume += 10  # increase volume by 10 when loading

    def site_unloading(self):
        self.volume -= 10  # decrease volume by 10 when unloading

# create a class for the status, if the is status 0, it's just started, if the truck in status 1, it's loaded from log site, if the truck in status 2, it loaded sorted logs
# from the sort site and unloaded raw logs to the sort site, if the truck in status 3, it's unloaded to the mill
class Status():
    STARTED = 0
    LOADED_FROM_LOG_SITE = 1
    LOADED_SORTED_LOGS = 2
    UNLOADED_TO_MILL = 3

    def __str__(self):
        # Convert status to a more readable string.
        if self == Status.STARTED:
            return "Just started"
        elif self == Status.LOADED_FROM_LOG_SITE:
            return "Loaded from log site"
        elif self == Status.LOADED_SORTED_LOGS:
            return "Loaded sorted logs from sort site and unloaded raw logs"
        elif self == Status.UNLOADED_TO_MILL:
            return "Unloaded to mill"
        else:
            return "Unknown status"



# create a class for the environment
class ForestTruckEnv(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}

    def __init__(self, render_mode="human", size = 8, reward =60, num_agents=4):
        super(ForestTruckEnv, self).__init__()
        self.reward = reward
        self.num_agents = num_agents
        self.size = size  # The size of the square grid
        
        self.render_mode = render_mode
        self.done = False
        self.truncated = False
        self.t = 0
        
        # set up the random agent starting location
        self.agents = [Agent(index, (random.randint(0, self.size-1), random.randint(0, self.size-1)), 0) for index in range(self.num_agents)]
        self.available_actions = [
            Action('up', (0, 1)),
            Action('down', (0, -1)),
            Action('left', (-1, 0)),
            Action('right', (1, 0)),
        ]
        # set up the log sites, sort sites, and mills
        locations = [(type, (x, y)) for type, coords in [('log_site', [(0, 0), (0, 3), (0, 7)]), 
                                                 ('sort_site', [(2, 2), (5, 2)]), 
                                                 ('sawmill', [(7, 7)]), 
                                                 ('pulpmill', [(7, 0)])] 
             for x, y in coords]

        self.log_sites, self.sort_sites, self.sawmill, self.pulpmill = [], [], [], []

        for i, (type, coord) in enumerate(locations):
            location = Location(type, reward, coord, i)
            if type == 'log_site':
                self.log_sites.append(location)
            elif type == 'sort_site':
                self.sort_sites.append(location)
            elif type == 'sawmill':
                self.sawmill.append(location)
            elif type == 'pulpmill':
                self.pulpmill.append(location)

        # define the action and observation space
        # self.action_space = spaces.Discrete(len(self.available_actions))
        self.action_space = spaces.Discrete(4)
        self.observation_space = spaces.Box(low=0, high=1, shape=(self.size, self.size, 2), dtype=np.uint8)

    #define the boundary function
    def _check_boundary(self, x, y):
        return -1 < x < self.size + 1 and -1 < y < self.size + 1
    
    # define the step function
    def step(self, action):
        # check if the action is valid
        assert self.action_space.contains(action)
        self.t += 1
        self.reward = 0
        reward = 60
        # get the delta of the action
        # get the delta of the action
        for agent in self.agents:
            # Sample a different action for each agent
            action = self.action_space.sample()

            # Get the delta of the action
            delta = self.available_actions[action].delta

            # Limit the delta to one position at a time
            delta = (max(-1, min(1, delta[0])), max(-1, min(1, delta[1])))

            # Move the agent
            agent.move(delta, self.size)


        # update the location volume
        for i in range(len(self.log_sites)):
            self.log_sites[i].logging()

        for i in range(len(self.log_sites)):
            self.log_sites[i].site_loading()
        
        # check if the agent is at the boundary
        for agent in self.agents:
            if not self._check_boundary(*agent.coordinates):
                agent.reward += -10
        
        # location overload check and apply penalty to the environment
        site_penalty = -2
        for log_site in self.log_sites:
            if log_site.volume > 100:
                log_site.reward += site_penalty
                # self.reward += log_site.reward
        for sort_site in self.sort_sites:
            if sort_site.volume > 100:
                sort_site.reward += site_penalty
                # self.reward += log_site.reward
        for mill in self.sawmill:
            if mill.volume > 100:
                sort_site.reward += site_penalty
                # self.reward += log_site.reward
        for mill in self.pulpmill:
            if mill.volume > 100:
                sort_site.reward += site_penalty
                # self.reward += log_site.reward


        # check if the agent is at the log site, sort site, or mill, and make interactions
        for agent in self.agents:
            if agent.status == 0:  # Looking for log site
                for i, log_site in enumerate(self.log_sites):
                    if agent.coordinates == log_site.coordinates:
                        agent.reward += reward
                        agent.update_status(Status.LOADED_FROM_LOG_SITE)
                        agent.logging_loading()
                        self.log_sites[i].site_unloading()
                        break

            elif agent.status == 1:  # Looking for sort site
                for i, sort_site in enumerate(self.sort_sites):
                    if agent.coordinates == sort_site.coordinates:
                        agent.reward += reward
                        agent.update_status(Status.LOADED_SORTED_LOGS)
                        agent.agent_unload()
                        agent.agent_load()
                        self.sort_sites[i].site_loading()
                        break

            elif agent.status == 2:  # Looking for mill
                for i, mill in enumerate(self.sawmill):
                    if agent.coordinates == mill.coordinates:
                        agent.reward += reward
                        agent.update_status(Status.UNLOADED_TO_MILL)
                        agent.agent_unload()
                        self.sawmill[i].site_loading()
                        break
                for i, mill in enumerate(self.pulpmill):
                    if agent.coordinates == mill.coordinates:
                        agent.reward += reward
                        agent.update_status(Status.UNLOADED_TO_MILL)
                        agent.agent_unload()
                        self.pulpmill[i].site_loading()
                        break

            # check if the agent is done by the status and step exceed the limit 
            elif agent.status == 3:  # Agent status indicating termination
                agent.terminate = True
                agent.truncated = True

        for agent in self.agents:
            self.reward += agent.reward

        # check if the agent is done
        if all([agent.terminate for agent in self.agents]):
            self.done = True
        # check if the agent is truncated
        if all([agent.truncated for agent in self.agents]):
            self.truncated = True
        # get the observation
        obs = self._get_obs()
        # get the info. the info includes the distance between the agent and the mill and the reward
        info = {"distance": 0, "reward": self.reward}
        for agent in self.agents:
            for mill in self.sawmill:
                info["distance"] = np.linalg.norm(np.array(agent.coordinates) - np.array(mill.coordinates))
                info["reward"] = self.reward

        return obs, self.reward, self.done, info

    def _get_obs(self):
        obs = np.zeros((self.size, self.size, 2), dtype=np.uint8)
        for agent in self.agents:
            if isinstance(agent.coordinates, tuple):  # Check if agent.coordinates is an instance of tuple
                x, y = agent.coordinates
                if 0 <= x < self.size and 0 <= y < self.size:
                    obs[x, y, 0] = 1
        return obs



    # define the reset function
    def reset(self):
    # Create a list of (0, 0) tuples of the same length as the number of agents
        initial_coordinates = [(0, 0) for _ in range(self.num_agents)]

        # Create agents at the initial coordinates
        self.agents = [Agent(index, coordinates, 0) for index, coordinates in enumerate(initial_coordinates)]

        self.done = False
        self.truncated = False
        self.reward = 0
        self.t = 0
        return self._get_obs()

    # define the render function
    def render(self, mode="human"):
        if mode == "human":
            # Create a grid of the same size as the environment
            grid = np.zeros((self.size, self.size, 3), dtype=np.uint8)
            # Color the grid based on the location of the agent
            # for agent in self.agents:
            #     if isinstance(agent.coordinates, tuple):
            #         x, y = agent.coordinates
            #         grid[x, y] = [0, 0, 0]  # Green color for agent   
            # Color the grid based on the location of the log sites
            for log_site in self.log_sites:
                x, y = log_site.coordinates
                grid[x, y] = [0, 0, 255]    # Blue color for log site
            # Color the grid based on the location of the sort sites
            for sort_site in self.sort_sites:
                x, y = sort_site.coordinates
                grid[x, y] = [255, 0, 0] # Red color for sort site
            # Color the grid based on the location of the sawmill
            for mill in self.sawmill:
                x, y = mill.coordinates
                grid[x, y] = [255, 255, 0]  # Yellow color for mill
            # Color the grid based on the location of the pulpmill
            for mill in self.pulpmill:
                x, y = mill.coordinates
                grid[x, y] = [255, 100, 0] # Orange color for mill
             
            # Display the grid
            plt.imshow(grid)

            for agent in self.agents:
                if isinstance(agent.coordinates, tuple):
                    x, y = agent.coordinates
                    grid[x, y] = [0, 0, 0]  # Green color for agent  
                    circle = patches.Circle((y, x), radius=0.5, color='green')  # Create a green circle at the agent's coordinates
                    plt.gca().add_patch(circle)  # Add the circle to the plot
                    plt.text(y, x, str(agent.volume), color='white', 
                            horizontalalignment='center', verticalalignment='center')
                    
            for log_site in self.log_sites:
                x, y = log_site.coordinates
                plt.text(y, x, str(log_site.volume), color='white', 
                        horizontalalignment='center', verticalalignment='center')
                
            for sort_site in self.sort_sites:   
                x, y = sort_site.coordinates
                plt.text(y, x, str(sort_site.volume), color='white', 
                        horizontalalignment='center', verticalalignment='center')
                
            for mill in self.sawmill:
                x, y = mill.coordinates
                plt.text(y, x, str(mill.volume), color='white', 
                        horizontalalignment='center', verticalalignment='center')
                
            for mill in self.pulpmill:
                x, y = mill.coordinates
                plt.text(y, x, str(mill.volume), color='white', 
                        horizontalalignment='center', verticalalignment='center')   


            plt.axis("off")
            plt.show()



        elif mode == "rgb_array":
            obs = self._get_obs()
            return obs
        else:
            raise NotImplementedError
        
    # define the close function
    def close(self):
        pass

    # define the seed function
    def seed(self, seed=None):
        pass

    # define the get action function
    def get_action(self, action):
        return self.available_actions[action]
    
    

In [3]:
from gym.envs.registration import register

register(
    id='ForestTruck-v0',
    entry_point='__main__:ForestTruckEnv'  # Replace your_module_path with the actual path
)

import gym
from __main__ import ForestTruckEnv

env = gym.make('ForestTruck-v0')


  logger.warn(


In [4]:
# initial_observation = env.reset()
# # print("Initial Observation:", initial_observation)


# for _ in range(100):  # Take 5 steps as an example
#     action = env.action_space.sample()  # Randomly sample an action
#     observation, reward, terminated, info = env.step(action)  # Take a step in the environment
#     # print(f"Action: {action}, Observation: {observation}, Reward: {reward}, Done: {terminated}, Info: {info}")
#     print(f"Action: {action}, Reward: {reward}, Done: {terminated}, Info: {info}")
#     for i, agent in enumerate(env.agents):  # Assuming env.agents is a list of agents
#         print(f'Agent {i} location: {agent.coordinates}, status: {agent.status}, volume: {agent.volume}')
#     env.render()  # Render the environment
#     if terminated:
#         print("Episode finished after these many steps.")
#         break




In [5]:
from stable_baselines3 import DQN

# Create the environment
env = ForestTruckEnv(render_mode='human', size=8, reward=20, num_agents=1)

# Instantiate the agent
model = DQN("MlpPolicy", env, verbose=1, tensorboard_log="./tensorboard_logs/")

# Train the agent
model.learn(total_timesteps=100000)


# Save the trained agent
model.save("dqn_mining_truck")

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




Logging to ./tensorboard_logs/DQN_18
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 149      |
|    ep_rew_mean      | 1.63e+04 |
|    exploration_rate | 0.943    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 1705     |
|    time_elapsed     | 0        |
|    total_timesteps  | 595      |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 106      |
|    n_updates        | 123      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 221      |
|    ep_rew_mean      | 2.5e+04  |
|    exploration_rate | 0.832    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 1872     |
|    time_elapsed     | 0        |
|    total_timesteps  | 1770     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss         

In [6]:
from stable_baselines3 import DQN
from stable_baselines3.common.monitor import Monitor

# Create the environment
env = ForestTruckEnv(render_mode='human', size=8, reward=20, num_agents=1)

# Wrap the environment with the Monitor wrapper
env = Monitor(env, "./tensorboard_logs/")

# Instantiate the agent with TensorBoard logging enabled
model = DQN("MlpPolicy", env, verbose=1, tensorboard_log="./tensorboard_logs/")

# Train the agent
model.learn(total_timesteps=100000)

# Save the trained agent
model.save("dqn_mining_truck")

Using cuda device
Wrapping the env in a DummyVecEnv.


AssertionError: The algorithm only supports (<class 'gymnasium.spaces.discrete.Discrete'>,) as action spaces but Discrete(4) was provided

: 

In [None]:
%load_ext tensorboard
%tensorboard --logdir ./tensorboard_logs/


Launching TensorBoard...

In [None]:
import matplotlib.pyplot as plt
from stable_baselines3.common import results_plotter

# Load the results
results = results_plotter.load_results('./tensorboard_logs/')

# Plot the results
plt.figure(figsize=(10, 5))
results_plotter.plot_results(results, total_timesteps=100000, title='DQN Mining Truck')
plt.show()

LoadMonitorResultsError: No monitor files of the form *monitor.csv found in ./tensorboard_logs/