In [2]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import numpy as np
from torch.distributions import Categorical

import gymnasium as gym
from gym import wrappers



['/Users/shayan/Desktop/GithubProjects/Imag_aug_MARL/Imagination_Aug_MARL', '/Users/shayan/opt/anaconda3/envs/Env15/lib/python3.10/site-packages/cv2', '/Users/shayan/opt/anaconda3/envs/Env15/lib/python310.zip', '/Users/shayan/opt/anaconda3/envs/Env15/lib/python3.10', '/Users/shayan/opt/anaconda3/envs/Env15/lib/python3.10/lib-dynload', '', '/Users/shayan/opt/anaconda3/envs/Env15/lib/python3.10/site-packages', '/Users/shayan/opt/anaconda3/envs/Env15/lib/python3.10/site-packages/PyQt5_sip-12.11.0-py3.10-macosx-10.9-x86_64.egg', '/Users/shayan/opt/anaconda3/envs/Env15/lib/python3.10/site-packages/gym_maze-0.4-py3.10.egg', '/Users/shayan/opt/anaconda3/envs/Env15/lib/python3.10/site-packages/gym_notices-0.0.8-py3.10.egg', '/Users/shayan/opt/anaconda3/envs/Env15/lib/python3.10/site-packages/cloudpickle-2.2.1-py3.10.egg', '/Users/shayan/opt/anaconda3/envs/Env15/lib/python3.10/site-packages/keras_rl-0.4.2-py3.10.egg', '/Users/shayan/slimevolleygym']
['/Users/shayan/Desktop/GithubProjects/Imag_a

In [3]:
#defining the environment class
class MountainCarWrapper(gym.Env):
    def __init__(self):
        self.env = gym.make('MountainCar-v0')
        self.action_space = self.env.action_space
        self.observation_space = self.env.observation_space


    def reset(self):
            state = self.env.reset()
            
            return np.array(state[0])

    def step(self, action):
        next_state, reward, done, _ = self.env.step(action)[0:4]  
        return np.array(next_state), reward, done




    def render(self, mode='human'):
        return self.env.render(mode)

    def close(self):
        return self.env.close()






In [None]:
#defining the imagination core + rollout enconder 

class I2A_MountainCar(nn.Module):
    def __init__(self, num_actions, rollout_len):
        super(I2A_MountainCar, self).__init__()

        # Define the imagination module
        self.imagination = nn.Sequential(
            nn.Linear(2, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU(),
            nn.Linear(32, 3 * rollout_len)
        )

        # Define the encoder module
        self.encoder = nn.Sequential(
            nn.Linear(2, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU()
        )

        # Define the core module
        self.core = nn.Sequential(
            nn.Linear(32 + 3 * rollout_len, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU()
        )

        # Define the policy and value heads
        self.policy_head = nn.Linear(64, num_actions)
        self.value_head = nn.Linear(64, 1)

    def forward(self, state, rollout_len):
        # Imagination module
        imagined_rollout = self.imagination(state)

        # Encoder module
        encoded_state = self.encoder(state)

        # Concatenate the imagined rollout and encoded state
        x = torch.cat([imagined_rollout, encoded_state], dim=1)

        # Core module
        x = self.core(x)

        # Policy and value heads
        action_probs = F.softmax(self.policy_head(x), dim=1)
        state_value = self.value_head(x)

        return action_probs, state_value


In [4]:
#2nd approach
class EnvironmentModel(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim):
        super(EnvironmentModel, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3_state = nn.Linear(hidden_dim, state_dim)
        self.fc3_reward = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        x = torch.cat([state, action], dim=-1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        next_state = self.fc3_state(x)
        reward = self.fc3_reward(x)
        return next_state, reward


class I2A_MountainCar(nn.Module):
    def __init__(self, state_dim, action_dim, rollout_len, hidden_dim=32):
        super(I2A_MountainCar, self).__init__()

        self.rollout_len = rollout_len
        self.action_dim = action_dim

        # Define the environment model
        self.env_model = EnvironmentModel(state_dim, action_dim, hidden_dim)

        # Define the imagination module (rollout encoders)
        self.imagination = nn.Sequential(
            nn.Linear(state_dim * rollout_len, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
        )

        # Define the model-free encoder
        self.encoder = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
        )

        # Define the core module
        self.core = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
        )

        # Define the policy and value heads
        self.policy_head = nn.Linear(hidden_dim, action_dim)
        self.value_head = nn.Linear(hidden_dim, 1)

    def forward(self, state, action_space):
        # Generate imagined rollouts
        imagined_states = []
        for _ in range(self.rollout_len):
            action = action_space.sample()
            state, _ = self.env_model(state, action)
            imagined_states.append(state)
        imagined_states = torch.cat(imagined_states, dim=-1)

        # Imagination module (rollout encoders)
        imagined_rollout = self.imagination(imagined_states)

        # Encoder module
        encoded_state = self.encoder(state)

        # Concatenate the imagined rollout and encoded state
        x = torch.cat([imagined_rollout, encoded_state], dim=-1)

        # Core module
        x = self.core(x)

        # Policy and value heads
        action_probs = F.softmax(self.policy_head(x), dim=-1)
        state_value = self.value_head(x)

        return action_probs, state_value

In [5]:
import argparse
import sys

def get_args():
    parser = argparse.ArgumentParser(description="Imagination-Augmented Agents for Deep Reinforcement Learning")

    # Training settings
    parser.add_argument("--num_episodes", type=int, default=30, help="Number of training episodes")
    parser.add_argument("--batch_size", type=int, default=20, help="Batch size for training")
    parser.add_argument("--replay_memory_size", type=int, default=10000, help="Size of the replay memory")
    parser.add_argument("--rollout_len", type=int, default=5, help="Length of the rollout for imagination")
    parser.add_argument("--gamma", type=float, default=0.99, help="Discount factor for rewards")
    parser.add_argument("--lr", type=float, default=1e-3, help="Learning rate for the optimizer")

    if sys.argv[0].endswith("ipykernel_launcher.py"):
        args = parser.parse_args(args=[])
    else:
        args = parser.parse_args()

    return args


import random
from collections import namedtuple

Experience = namedtuple('Experience', ('state', 'action', 'reward', 'next_state', 'done'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, state, action, reward, next_state, done):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Experience(state=state, action=action, reward=reward, next_state=next_state, done=done)
        self.position = (self.position + 1) % self.capacity


    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


In [9]:
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from itertools import count
from collections import namedtuple


# Define the namedtuple to store experiences
Experience = namedtuple('Experience', ('state', 'action', 'reward', 'next_state', 'done'))

# function to select an action using the current policy
def select_action(model, state):
    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    action_probs, _ = model(state, args.rollout_len)
    m = torch.distributions.Categorical(action_probs)
    return m.sample().item()

# Main function to train and test the I2A agent
def main(args):
    # Create the environment
    env = MountainCarWrapper()
    state = env.reset()


    # Instantiate the I2A model and optimizer
    model = I2A_MountainCar(env.observation_space.shape[0], env.action_space.n, args.rollout_len)
    optimizer = optim.Adam(model.parameters(), lr=args.lr)

    # Initialize the replay memory
    memory = ReplayMemory(args.replay_memory_size)

    # Main training loop
    for episode in range(args.num_episodes):
        state = env.reset()
        episode_reward = 0

        for t in count():
            # Select an action based on the current policy
            action = select_action(model, state)

            # Execute the action and store the experience
            next_state, reward, done= env.step(action)[0:3]
            memory.push(state, action, reward, next_state, done)



            # Update the state and episode reward
            state = next_state
            episode_reward += reward

            # If enough experiences are collected, perform a training step
            if len(memory) >= args.batch_size:
                experiences = memory.sample(args.batch_size)
                batch = Experience(*zip(*experiences))

               
                # Prepare the data for training
                states = torch.tensor(np.array(batch.state), dtype=torch.float32)
                actions = torch.tensor(np.array(batch.action), dtype=torch.long).unsqueeze(1)
                rewards = torch.tensor(np.array(batch.reward), dtype=torch.float32).unsqueeze(1)
                next_states = torch.tensor(np.array(batch.next_state), dtype=torch.float32)
                dones = torch.tensor(np.array(batch.done), dtype=torch.float32).unsqueeze(1)


                # Compute the current Q values
                action_probs, state_values = model(states, args.rollout_len)
                action_values = action_probs.gather(1, actions)

                # Compute the target Q values
                _, next_state_values = model(next_states, args.rollout_len)
                target_action_values = rewards + (args.gamma * next_state_values * (1 - dones))

                # Compute the loss and perform a training step
                loss = (action_values - target_action_values.detach()).pow(2).mean()
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            # Check if the episode is finished
            if done:
                print("Episode: {}, Reward: {}, Timesteps: {}".format(episode, episode_reward, t + 1))
                break

    # Testing the trained agent
    print("Testing the trained agent...")
    test_episodes = 10
    test_rewards = []

    for episode in range(test_episodes):
        state = env.reset()
        episode_reward = 0

        for t in count():
            action = select_action(model, state)
            next_state, reward, done, _ = env.step(action)
            episode_reward += reward
            state = next_state

            if done:
                print("Test Episode: {}, Reward: {}, Timesteps: {}".format(episode, episode_reward, t + 1))
                test_rewards.append(episode_reward)
                break

    print("Average test reward: {:.2f}".format(sum(test_rewards) / test_episodes))
    env.close()
if __name__ == "__main__":

    args = get_args()
    main(args)


AttributeError: 'int' object has no attribute 'sample'