In [6]:
import gym
from gym import spaces
import numpy as np

class ThreeQueueEnvironment(gym.Env):
    def __init__(self):
        self.action_space = spaces.Discrete(3)
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(3,), dtype=float)
        
        self.queue_rate = [0.3, 0.25, 0.4]
        
        self.queue_packet_progress = [0,0,0]
        
        self.total_packets = [0,0,0]
        
        self.mean_packet_delay = [0,0,0]
        
        self.time = 1000
    
    def step(self, action):
        prev_queue3_delay = self.queue3_delay
        
        for i in range(self.queue_packet_progress):
            self.queue_packet_progress[i] += self.queue_rate[i]
        
        
        
        if action == 0:
            if self.queue1_delay >= 1 :
                self.queue1_delay -= 1
        elif action == 1:
            if self.queue2_delay >= 1:
                self.queue2_delay -= 1
        elif action == 2:
            if self.queue3_delay >= 1:
                self.queue3_delay -= 1
        
        
        observation = np.array([self.queue1_delay, self.queue2_delay, self.queue3_delay])
        reward = 0
        if self.queue1_delay > 6 or self.queue2_delay > 4:
            reward = -1
        if self.queue3_delay < prev_queue3_delay:
            reward = 1
            
        self.time -= 1
        done = self.time <= 0  # You can define the termination condition based on your specific criteria

        return observation, reward, done, {}
    
    def reset(self):
        self.queue1_delay = 0
        self.queue2_delay = 0
        self.queue3_delay = 0
        
        observation = np.array([self.queue1_delay, self.queue2_delay, self.queue3_delay])
        return observation
    
    def render(self):
        print("Queue 1 Delay:", self.queue1_delay)
        print("Queue 2 Delay:", self.queue2_delay)
        print("Queue 3 Delay:", self.queue3_delay)

# Test the ThreeQueueEnvironment
env = ThreeQueueEnvironment()
observation = env.reset()

for _ in range(100):
    action = env.action_space.sample()
    print(action)
    next_observation, reward, done, _ = env.step(action)
    env.render()
    print("Reward:", reward)
    print("Done:", done)
    print("--------------------")


0
Queue 1 Delay: 0.3
Queue 2 Delay: 0.4
Queue 3 Delay: 0.4
Reward: 0
Done: False
--------------------
0
Queue 1 Delay: 0.6
Queue 2 Delay: 0.8
Queue 3 Delay: 0.8
Reward: 0
Done: False
--------------------
1
Queue 1 Delay: 0.8999999999999999
Queue 2 Delay: 0.20000000000000018
Queue 3 Delay: 1.2000000000000002
Reward: 0
Done: False
--------------------
0
Queue 1 Delay: 0.19999999999999996
Queue 2 Delay: 0.6000000000000002
Queue 3 Delay: 1.6
Reward: 0
Done: False
--------------------
1
Queue 1 Delay: 0.49999999999999994
Queue 2 Delay: 2.220446049250313e-16
Queue 3 Delay: 2.0
Reward: 0
Done: False
--------------------
0
Queue 1 Delay: 0.7999999999999999
Queue 2 Delay: 0.40000000000000024
Queue 3 Delay: 2.4
Reward: 0
Done: False
--------------------
2
Queue 1 Delay: 1.0999999999999999
Queue 2 Delay: 0.8000000000000003
Queue 3 Delay: 1.7999999999999998
Reward: 1
Done: False
--------------------
0
Queue 1 Delay: 0.3999999999999999
Queue 2 Delay: 1.2000000000000002
Queue 3 Delay: 2.199999999999

In [7]:
import gym
import numpy as np

class DiscretizedObservationWrapper(gym.ObservationWrapper):
    def __init__(self, env, bins):
        super().__init__(env)
        self.bins = bins
        self.observation_space = gym.spaces.Discrete(np.prod(bins))

    def observation(self, observation):
        # Discretize the continuous observation
        discrete_obs = np.zeros_like(observation, dtype=np.int)
        for i, obs in enumerate(observation):
            discrete_obs[i] = obs
        return tuple(discrete_obs)

# Define the continuous observation space
continuous_obs_space = env.observation_space

# Define the number of bins for each dimension of the observation space
bins = [3] * len(continuous_obs_space.high)  # Adjust the number of bins as needed

# Create the discretized observation wrapper
env = DiscretizedObservationWrapper(env, bins)

# Now you can use the discretized observation space in a Q-learning model
# ...
env.observation_space


Discrete(27)

In [8]:
observation = env.reset()
done = False
while not done:
    action = env.action_space.sample()
    next_observation, reward, done, _ = env.step(action)
    env.render()
    print("Observation:", next_observation)
    print("Reward:", reward)
    print("Done:", done)
    print("--------------------")

env.close()

ValueError: too many values to unpack (expected 2)

In [None]:
def encode(delay_1, delay_2, delay_3)