In [7]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from abc import ABCMeta, abstractmethod

# ====== Base Agent (Abstract Class) ======
class BaseAgent:
    __metaclass__ = ABCMeta

    def __init__(self):
        pass

    @abstractmethod
    def agent_init(self, agent_info={}):
        pass

    @abstractmethod
    def agent_start(self, observation):
        pass

    @abstractmethod
    def agent_step(self, reward, observation):
        pass

    @abstractmethod
    def agent_end(self, reward):
        pass

    def agent_cleanup(self):
        pass

    def agent_message(self, message):
        pass


# ====== Policy Network (Neural Network for Policy) ======
class PolicyNetwork(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=16):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, action_size)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        return torch.softmax(self.fc2(x), dim=-1)  # Softmax for action probabilities


# ====== REINFORCE Agent (Using Policy Gradient) ======
class REINFORCEAgent(BaseAgent):
    def __init__(self):
        super().__init__()

    def agent_init(self, agent_info={}):
        self.gamma = agent_info.get("gamma", 0.99)  # Discount factor
        self.learning_rate = agent_info.get("learning_rate", 0.01)
        self.state_size = agent_info.get("state_size", 2)  # GridWorld uses (x, y) coordinates
        self.action_size = agent_info.get("action_size", 4)  # Up, Down, Left, Right

        # Initialize Policy Network
        self.policy_network = PolicyNetwork(self.state_size, self.action_size)
        self.optimizer = optim.Adam(self.policy_network.parameters(), lr=self.learning_rate)

        # Stores (state, action, reward) tuples per episode
        self.trajectory = []

    def agent_start(self, observation):
        state = torch.tensor(observation, dtype=torch.float32)
        action_probs = self.policy_network(state)
        action = np.random.choice(self.action_size, p=action_probs.detach().numpy())  # Sample action
        self.trajectory.append((state, action, 0))  # Reward is not given at the start
        return action

    def agent_step(self, reward, observation):
        state = torch.tensor(observation, dtype=torch.float32)
        action_probs = self.policy_network(state)
        action = np.random.choice(self.action_size, p=action_probs.detach().numpy())

        self.trajectory[-1] = (self.trajectory[-1][0], self.trajectory[-1][1], reward)  # Update last reward
        self.trajectory.append((state, action, 0))  # New state-action pair

        return action

    def agent_end(self, reward):
        self.trajectory[-1] = (self.trajectory[-1][0], self.trajectory[-1][1], reward)  # Final reward
        self._update_policy()
        self.trajectory = []  # Clear trajectory for next episode

    def _update_policy(self):
        """Computes returns and updates policy using REINFORCE algorithm."""
        returns = []
        G = 0
        for _, _, reward in reversed(self.trajectory):
            G = reward + self.gamma * G
            returns.insert(0, G)  # Compute return-to-go

        returns = torch.tensor(returns, dtype=torch.float32)
        returns = (returns - returns.mean()) / (returns.std() + 1e-5)  # Normalize for stability

        policy_loss = []
        for (state, action, _), G in zip(self.trajectory, returns):
            action_probs = self.policy_network(state)
            log_prob = torch.log(action_probs[action])
            policy_loss.append(-log_prob * G)  # REINFORCE update rule

        loss = torch.stack(policy_loss).sum()
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def agent_message(self, message):
        if message == "get_policy":
            return self.policy_network.state_dict()
        return None
    
class BaseEnvironment:
    __metaclass__ = ABCMeta
    
    def __init__(self):
        reward = None
        state = None 
        termination = None
        self.reward_state_term = (reward, state, termination)
    
    @abstractmethod
    def env_init(self, env_info={}):
        pass
    
    @abstractmethod
    def env_start(self):
        pass
    
    @abstractmethod
    def env_step(self, action):
        pass
    
    def env_cleanup(self):
        pass
    
    def env_message(self, message):
        pass


# ====== GridWorld Environment (Same as Before) ======
class GridWorldEnvironment(BaseEnvironment):
    def __init__(self):
        super().__init__()
        self.grid_size = -1
        self.agent_position = (-1, -1)
        self.goal_position = (-1, -1)

    def env_init(self, env_info={}):
        self.grid_size = env_info.get("grid_size")
        self.goal_position = tuple(env_info.get("goal_position"))

    def env_start(self):
        self.agent_position = (0, 0)
        return self.agent_position

    def env_step(self, action):
        x, y = self.agent_position
        if action == 0:  # Up
            y = max(0, y - 1)
        elif action == 1:  # Down
            y = min(self.grid_size - 1, y + 1)
        elif action == 2:  # Left
            x = max(0, x - 1)
        elif action == 3:  # Right
            x = min(self.grid_size - 1, x + 1)

        self.agent_position = (x, y)

        if self.agent_position == self.goal_position:
            terminated = True
            reward = 1.0
        else:
            terminated = False
            reward = -0.1

        return reward, self.agent_position, terminated
    
    def print_grid(self):
        """Prints the grid showing the agent's movement."""
        grid = [["." for _ in range(self.grid_size)] for _ in range(self.grid_size)]
        grid[self.goal_position[1]][self.goal_position[0]] = "G"
        grid[self.agent_position[1]][self.agent_position[0]] = "A"

        print("\n".join([" ".join(row) for row in grid]))
        print("-" * (self.grid_size * 2))


# ====== Running the Environment & Training the Agent ======
if __name__ == "__main__":
    env = GridWorldEnvironment()
    env_info = {"grid_size": 7, "goal_position": (6, 6)}
    env.env_init(env_info)

    agent = REINFORCEAgent()
    agent_info = {"gamma": 0.99, "learning_rate": 0.01, "state_size": 2, "action_size": 4}
    agent.agent_init(agent_info)

    for episode in range(1):  # Train for 1000 episodes
        state = env.env_start()
        action = agent.agent_start(state)

        env.print_grid()
        while True:
            reward, next_state, terminated = env.env_step(action)
            env.print_grid()
            if terminated:
                agent.agent_end(reward)
                break
            action = agent.agent_step(reward, next_state)

A . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . G
--------------
. A . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . G
--------------
. A . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . G
--------------
. A . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . G
--------------
A . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . G
--------------
A . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . G
--------------
A . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . G
--------------
. A . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . G
--------------
. . . . . . .
. A . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . .
. . . . . . 

In [None]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Mạng chính sách (Actor) và mạng giá trị (Critic)
class ActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_size=64):
        super(ActorCritic, self).__init__()
        self.actor = nn.Sequential(
            nn.Linear(state_dim, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_dim),
            nn.Softmax(dim=-1)
        )
        self.critic = nn.Sequential(
            nn.Linear(state_dim, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )
    
    def forward(self, state):
        action_probs = self.actor(state)
        state_value = self.critic(state)
        return action_probs, state_value

# Hàm tính toán Advantage (GAE)
def compute_advantages(rewards, values, gamma=0.99, lambda_=0.95):
    advantages = np.zeros_like(rewards)
    last_advantage = 0
    for t in reversed(range(len(rewards))):
        delta = rewards[t] + gamma * values[t + 1] - values[t]
        advantages[t] = last_advantage = delta + gamma * lambda_ * last_advantage
    return advantages

# PPO Training Loop
def train_ppo(env, model, optimizer, epochs=10, batch_size=32, clip_epsilon=0.2):
    for episode in range(1000):
        state = env.reset()
        states, actions, rewards, values, log_probs = [], [], [], [], []

        # Thu thập dữ liệu từ môi trường
        for _ in range(200):  # Tối đa 200 bước mỗi tập
            state_tensor = torch.FloatTensor(state)
            action_probs, value = model(state_tensor)
            action = np.random.choice(len(action_probs.detach().numpy()), p=action_probs.detach().numpy())

            next_state, reward, done, _ = env.step(action)

            states.append(state_tensor)
            actions.append(action)
            rewards.append(reward)
            values.append(value.item())
            log_probs.append(torch.log(action_probs[action]))

            state = next_state
            if done:
                break

        # Thêm phần tử cuối cùng của values (0 nếu kết thúc)
        values.append(0)

        # Tính Advantage
        advantages = compute_advantages(rewards, values)
        advantages = torch.FloatTensor(advantages)

        # PPO Update
        for _ in range(epochs):
            new_action_probs, new_values = model(torch.stack(states))
            new_log_probs = torch.log(new_action_probs.gather(1, torch.LongTensor(actions).unsqueeze(1)).squeeze())

            # Tính toán ratio
            ratio = torch.exp(new_log_probs - torch.stack(log_probs))

            # PPO Clipping
            clipped_ratio = torch.clamp(ratio, 1 - clip_epsilon, 1 + clip_epsilon)
            loss_clip = torch.min(ratio * advantages, clipped_ratio * advantages)

            # Tổng hợp loss
            value_loss = nn.MSELoss()(new_values.squeeze(), torch.FloatTensor(rewards))
            loss = -loss_clip.mean() + 0.5 * value_loss

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        if episode % 50 == 0:
            print(f"Episode {episode}, Reward: {sum(rewards)}")

# Tạo môi trường
env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

# Khởi tạo mô hình PPO
model = ActorCritic(state_dim, action_dim)
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Huấn luyện
train_ppo(env, model, optimizer)

In [1]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Mạng chính sách (Actor) và mạng giá trị (Critic)
class ActorCritic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_size=64):
        super(ActorCritic, self).__init__()
        self.actor = nn.Sequential(
            nn.Linear(state_dim, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_dim),
            nn.Softmax(dim=-1)
        )
        self.critic = nn.Sequential(
            nn.Linear(state_dim, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )
    
    def forward(self, state):
        action_probs = self.actor(state)
        state_value = self.critic(state)
        return action_probs, state_value

# Hàm tính toán Advantage (GAE)
def compute_advantages(rewards, values, gamma=0.99, lambda_=0.95):
    advantages = np.zeros_like(rewards)
    last_advantage = 0
    for t in reversed(range(len(rewards))):
        delta = rewards[t] + gamma * values[t + 1] - values[t]
        advantages[t] = last_advantage = delta + gamma * lambda_ * last_advantage
    return advantages

# PPO Training Loop
def train_ppo(env, model, optimizer, epochs=10, batch_size=32, clip_epsilon=0.2):
    for episode in range(1000):
        state = env.reset()
        states, actions, rewards, values, log_probs = [], [], [], [], []

        # Thu thập dữ liệu từ môi trường
        for _ in range(200):  # Tối đa 200 bước mỗi tập
            state_tensor = torch.FloatTensor(state)
            action_probs, value = model(state_tensor)
            action = np.random.choice(len(action_probs.detach().numpy()), p=action_probs.detach().numpy())

            next_state, reward, done, _ = env.step(action)

            states.append(state_tensor)
            actions.append(action)
            rewards.append(reward)
            values.append(value.item())
            log_probs.append(torch.log(action_probs[action]))

            state = next_state
            if done:
                break

        # Thêm phần tử cuối cùng của values (0 nếu kết thúc)
        values.append(0)

        # Tính Advantage
        advantages = compute_advantages(rewards, values)
        advantages = torch.FloatTensor(advantages)

        # PPO Update
        for _ in range(epochs):
            new_action_probs, new_values = model(torch.stack(states))
            new_log_probs = torch.log(new_action_probs.gather(1, torch.LongTensor(actions).unsqueeze(1)).squeeze())

            # Tính toán ratio
            ratio = torch.exp(new_log_probs - torch.stack(log_probs))

            # PPO Clipping
            clipped_ratio = torch.clamp(ratio, 1 - clip_epsilon, 1 + clip_epsilon)
            loss_clip = torch.min(ratio * advantages, clipped_ratio * advantages)

            # Tổng hợp loss
            value_loss = nn.MSELoss()(new_values.squeeze(), torch.FloatTensor(rewards))
            loss = -loss_clip.mean() + 0.5 * value_loss

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        if episode % 50 == 0:
            print(f"Episode {episode}, Reward: {sum(rewards)}")

# Tạo môi trường
env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

# Khởi tạo mô hình PPO
model = ActorCritic(state_dim, action_dim)
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Huấn luyện
train_ppo(env, model, optimizer)

  state_tensor = torch.FloatTensor(state)


ValueError: expected sequence of length 4 at dim 1 (got 0)

In [2]:
from torch.distributions import Categorical
import torch
import numpy as np
import gym
import torch.nn as nn
import torch.optim as optim

# Environment for the grid world
class GridWorldEnv(gym.Env):
    def __init__(self, grid_size, goal_position):
        '''
        This function is used to initialize the first state of the grid world.
        Args:
            grid_size: Size of the grid
            goal_position: Position of the goal
        '''
        # Init the super class
        super(GridWorldEnv, self).__init__()
        
        ''' 
        Attributes of the class: 
            + grid_size
            + agent_position
            + goal_position
            + action_space
            + state_space
        '''
        self.grid_size = grid_size
        self.agent_position = [0, 0]
        self.goal_position = goal_position
        
        self.action_space = gym.spaces.Discrete(4)  # 0: up, 1: down, 2: left, 3: right
        self.state_space = gym.spaces.Box(
            low=0, high=self.grid_size-1, shape=(2,), dtype=np.int32
        )
    
    def reset(self):
        '''
        This function is used to reset the agent_position to start position.
        Returns:
            The start position
        '''
        self.agent_position = [0, 0]
        return np.array(self.agent_position, dtype=np.int32)
    
    def step(self, action):
        '''
        This function is used to change the agent_position to the next position, with taking the action 'action'.
        Args:
            action: The action to take
        Returns:
            An array of: agent_position, reward, termination, info
        '''
        if action == 0:  # up
            self.agent_position[1] = max(0, self.agent_position[1] - 1)
        elif action == 1:  # down
            self.agent_position[1] = min(self.grid_size - 1, self.agent_position[1] + 1)
        elif action == 2:  # left
            self.agent_position[0] = max(0, self.agent_position[0] - 1)
        elif action == 3:  # right
            self.agent_position[0] = min(self.grid_size - 1, self.agent_position[0] + 1)
        
        # Check if the agent reached the goal
        terminated = (self.agent_position == self.goal_position)
        reward = 1.0 if terminated else -0.1  # Reward for reaching the goal
        
        return np.array(self.agent_position, dtype=np.int32), reward, terminated, {}
    
    def render(self):
        '''
        This function is used to print the current state of the grid world.
        '''
        grid = [["." for _ in range(self.grid_size)] for _ in range(self.grid_size)]
        grid[self.goal_position[1]][self.goal_position[0]] = "G"
        grid[self.agent_position[1]][self.agent_position[0]] = "A"
        
        print("\n".join([" ".join(row) for row in grid]))
        print("-" * (self.grid_size * 2))


class PPONetwork(nn.Module):
    def __init__(self, state_size, action_size, hidden_size):
        '''
        This function is used to initialize a MLP as PPONetwork.
        Args:
            state_size: Size of the state
            hidden_size: Size of the hidden layers
            action_size: Size of the action space
        '''
        super(PPONetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.actor = nn.Linear(hidden_size, action_size)
        self.critic = nn.Linear(hidden_size, 1)

    def forward(self, x):
        '''
        This function is used to pass the input state x through the PPONetwork,
        and then output a vector of action probabilities, and state value.
        Args:
            x: Input state
        Returns:
            action_probs: Probabilities of actions
            state_value: Value of the state
        '''
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        action_probs = torch.softmax(self.actor(x), dim=-1)
        state_value = self.critic(x)
        
        return action_probs, state_value


class PPOAgent:
    def __init__(self, gamma, clip_ratio, ppo_epochs, batch_size, state_size, action_size, lr=3e-4):
        '''
        This function is used to initialize some parameters of the model.
        Args: 
            gamma: Discount factor
            clip_ratio: Clipping ratio for PPO
            ppo_epochs: Number of PPO epochs
            batch_size: Batch size
            state_size: Size of the state
            action_size: Size of the action space
            lr: Learning rate
        '''
        self.gamma = gamma
        self.clip_ratio = clip_ratio
        self.ppo_epochs = ppo_epochs
        self.batch_size = batch_size
        
        self.policy = PPONetwork(state_size, action_size, hidden_size=64)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        self.memory = []

    def get_action(self, state):
        '''
        This function is used to pass an input state through the PPONetwork,
        and then receive an action, and the probability to take this action.
        Args:
            state: Input state
        Returns:
            action: Selected action
            log_prob: Log probability of the action
        '''
        state = torch.FloatTensor(state)
        with torch.no_grad():
            action_probs, _ = self.policy(state)
        dist = Categorical(action_probs)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        
        return action.item(), log_prob.item()

    def remember(self, state, action, log_prob, reward, done):
        '''
        This function is used to store experiences in memory.
        Args:
            state: Current state
            action: Action taken
            log_prob: Log probability of the action
            reward: Reward received
            done: Whether the episode is done
        '''
        self.memory.append((state, action, log_prob, reward, done))

    def update(self):
        '''
        This function is used to calculate the returns and update the policy.
        '''
        states, actions, log_probs, rewards, dones = zip(*self.memory)
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        log_probs = torch.FloatTensor(log_probs)
        rewards = torch.FloatTensor(rewards)
        dones = torch.FloatTensor(dones)

        # Compute discounted returns
        returns = []
        R = 0
        for reward, done in zip(reversed(rewards), reversed(dones)):
            if done:
                R = 0
            R = reward + self.gamma * R
            returns.insert(0, R)
        returns = torch.FloatTensor(returns)

        # Normalize returns
        returns = (returns - returns.mean()) / (returns.std() + 1e-5)

        # Update policy
        for _ in range(self.ppo_epochs):
            action_probs, state_values = self.policy(states)
            dist = Categorical(action_probs)
            new_log_probs = dist.log_prob(actions)
            entropy = dist.entropy()

            # Calculate advantages
            advantages = returns - state_values.detach().squeeze()

            # Calculate ratio
            ratio = (new_log_probs - log_probs).exp()

            # Surrogate loss
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()

            # Critic loss
            critic_loss = (returns - state_values.squeeze()).pow(2).mean()

            # Total loss
            loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy.mean()

            # Backpropagation
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        # Clear memory
        self.memory = []


def train():
    env = GridWorldEnv(grid_size=5, goal_position=[4, 4])
    agent = PPOAgent(gamma=0.99, clip_ratio=0.2, ppo_epochs=4, batch_size=64, state_size=2, action_size=4)

    num_episodes = 1000
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        episode_reward = 0

        print(f"Episode {episode + 1} starts:")
        while not done:
            env.render()  # Render the grid world to show the agent's movement
            action, log_prob = agent.get_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.remember(state, action, log_prob, reward, done)
            state = next_state
            episode_reward += reward

        print(f"Episode {episode + 1} ends with reward: {episode_reward}\n")
        agent.update()

if __name__ == "__main__":
    train()

Episode 1 starts:
A . . . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
. A . . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
. . . . .
. A . . .
. . . . .
. . . . .
. . . . G
----------
. . . . .
. . A . .
. . . . .
. . . . .
. . . . G
----------
. . A . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
. . A . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
. . A . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
. A . . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
A . . . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
. A . . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
. A . . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
. . A . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
. A . . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
A . . . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
A . . . .
. . . . .
. . . . .
. . . . .
. . . . G
----------
. . . . .
A . . . .
. . . . .
. . . . .
. . . . G
----------
. . . 