In [3]:
### Deep RL Hands-On book

# The anatomy of the agent

In [17]:
import random


class Environment:
    def __init__(self):
        self.steps_left = 10

    def get_observation(self):
        return [0.0, 0.0, 0.0]

    def get_actions(self):
        return [0, 1]

    def is_done(self):
        return self.steps_left == 0

    def action(self, action):
        if self.is_done():
            raise Exception("Game is over")
        self.steps_left -= 1
        return random.random()


class Agent:
    def __init__(self):
        self.total_reward = 0.0

    def step(self, env):
        current_obs = env.get_observation()
        actions = env.get_actions()
        reward = env.action(random.choice(actions))
        self.total_reward += reward


if __name__ == "__main__":
    env = Environment()
    agent = Agent()

    while not env.is_done():
        agent.step(env)

    print("Total reward got: %.4f" % agent.total_reward)

Total reward got: 6.0743


# Creating an environment GYM

In [10]:
import gymnasium as gym
e = gym.make('CartPole-v0',render_mode='human')

In [14]:
print(e.spec)

EnvSpec(id='CartPole-v0', entry_point='gymnasium.envs.classic_control.cartpole:CartPoleEnv', reward_threshold=195.0, nondeterministic=False, max_episode_steps=200, order_enforce=True, autoreset=False, disable_env_checker=False, apply_api_compatibility=False, kwargs={'render_mode': 'human'}, namespace=None, name='CartPole', version=0, additional_wrappers=(), vector_entry_point='gymnasium.envs.classic_control.cartpole:CartPoleVectorEnv')


In [28]:
obs = e.reset()
print(obs)

(array([-0.01428897, -0.00339545, -0.03387735, -0.0170701 ], dtype=float32), {})


In [31]:
print(e.observation_space)
print()
print(e.action_space)

Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)

Discrete(2)


In [34]:
e.step(0)

(array([-0.02616985, -0.5873367 , -0.01799543,  0.8298624 ], dtype=float32),
 1.0,
 False,
 False,
 {})

In [35]:
e.action_space.sample()

0

In [37]:
e.observation_space.sample()

array([-3.1235516e+00, -1.3034942e+38,  3.9547995e-01, -2.8787100e+38],
      dtype=float32)

In [17]:
EPOHS = 100
TOTAL_REWARD = 0.0

In [65]:
### Random action applied

EPOHS = 500
TOTAL_REWARD = 0.0
e = gym.make('CartPole-v0')
for _ in range(EPOHS):
    total_reward_epoh = 0.0
    total_steps = 0
    obs = e.reset()
    while True:
        action = e.action_space.sample()
        obs, reward, terminated, truncated, info = e.step(action)
        # print(f"Observations: {obs}")
        # print(f"Info: {info}")
        total_steps += 1
        total_reward_epoh += reward
        if (terminated or truncated):
            #print(f"\nAgent has been terminated({terminated}) or truncated({truncated})")
            break
    TOTAL_REWARD += total_reward_epoh
    #print(f"\nEpisodes was done: {total_steps} \nended with total reward: {total_reward_epoh}")
print(f"\n{EPOHS} Epohs was done \nended with total mean reward: {TOTAL_REWARD/EPOHS}")
e.close()


500 Epohs was done 
ended with total mean reward: 22.578


In [51]:
e.close()

# Custom wrapper

In [19]:
import random


class RandomActionWrapper(gym.ActionWrapper):
    def __init__(self, env, epsilon=0.1):
        super(RandomActionWrapper, self).__init__(env)
        self.epsilon = epsilon

    def action(self, action):
        if random.random() < self.epsilon:
            print("Random!")
            return self.env.action_space.sample()
        return action


if __name__ == "__main__":
    env = RandomActionWrapper(gym.make("CartPole-v0"))

    obs = env.reset()
    total_reward = 0.0

    while True:
        obs, reward, terminated, truncated, info = env.step(0)
        total_reward += reward
        if terminated or truncated:
            break

    print("Reward got: %.2f" % total_reward)

Random!
Random!
Random!
Reward got: 11.00


# Video Recording

In [6]:
import gymnasium as gym
from gymnasium.wrappers import RecordEpisodeStatistics, RecordVideo

num_eval_episodes = 1

env = gym.make("CartPole-v1", render_mode="rgb_array")  # replace with your environment
env = RecordVideo(env, video_folder="./videos", name_prefix="eval",
                  episode_trigger=lambda x: True)
env = RecordEpisodeStatistics(env)

for episode_num in range(num_eval_episodes):
    obs, info = env.reset()

    episode_over = False
    for _ in range(1000):
        action = env.action_space.sample()  # replace with actual agent
        obs, reward, terminated, truncated, info = env.step(action)

        episode_over = terminated or truncated
env.close()

#print(f'Episode time taken: {env.time}')
print(f'Episode total rewards: {env.return_queue}')
print()
print(f'Episode lengths: {env.length_queue}')

Moviepy - Building video /home/dmitriy/ITMO/DISS/chapter_2/videos/eval-episode-0.mp4.
Moviepy - Writing video /home/dmitriy/ITMO/DISS/chapter_2/videos/eval-episode-0.mp4



                                                   

Moviepy - Done !
Moviepy - video ready /home/dmitriy/ITMO/DISS/chapter_2/videos/eval-episode-0.mp4
Episode total rewards: deque([array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), array([0.], dtype=float32), arr

