## Register gym environment

In [1]:
import gym
import sys
from gym.envs.registration import register
sys.path.append('./Base2048Env')
import Base2048Env
register(
    id='2048-v0',
    entry_point='Base2048Env:Base2048Env'
)

In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical

In [3]:
device = torch.device("cuda")

In [4]:
# torch.__version__
torch.cuda.is_available() 

True

In [5]:
class PolicyValueNetwork(nn.Module):
    def __init__(self, input_dim, action_dim):
        super(PolicyValueNetwork, self).__init__()
        self.shared_layers = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
        )
        self.policy_head = nn.Linear(256, action_dim)
        self.value_head = nn.Linear(256, 1)

    def forward(self, x):
        shared_output = self.shared_layers(x)
        policy_logits = self.policy_head(shared_output)
        value = self.value_head(shared_output)
        return policy_logits, value


In [6]:
class PPOAgent:
    def __init__(self, input_dim, action_dim, lr=3e-4, gamma=0.99, eps_clip=0.2, entropy_coeff=0.01, update_epochs=4):
        self.gamma = gamma
        self.eps_clip = eps_clip
        self.entropy_coeff = entropy_coeff
        self.update_epochs = update_epochs

        self.policy_value_net = PolicyValueNetwork(input_dim, action_dim).to(device)
        self.optimizer = optim.Adam(self.policy_value_net.parameters(), lr=lr)

        # Memory buffers for training
        self.states = []
        self.actions = []
        self.rewards = []
        self.log_probs = []
        self.dones = []

    def select_action(self, state):
        """
        Select an action based on the current state using the policy network.
        Returns the action and related log probability for optimization later.
        """
        state = torch.FloatTensor(state).to(device)
        logits, _ = self.policy_value_net(state)
        probs = Categorical(logits=logits)
        action = probs.sample()
        return action.item(), probs.log_prob(action), probs.entropy()

    def store_transition(self, state, action, reward, log_prob, done):
        """
        Store the transition in memory for batch optimization.
        """
        self.states.append(state)
        self.actions.append(action)
        self.rewards.append(reward)
        self.log_probs.append(log_prob)
        self.dones.append(done)

    def compute_advantages_and_returns(self, next_state_value):
        """
        Compute advantages using Generalized Advantage Estimation (GAE) and returns.
        """
        advantages = []
        returns = []
        gae = 0
        values = self.get_values()
        for t in reversed(range(len(self.rewards))):
            delta = self.rewards[t] + self.gamma * next_state_value * (1 - self.dones[t]) - values[t]
            gae = delta + self.gamma * gae * (1 - self.dones[t])
            advantages.insert(0, gae)
            next_state_value = values[t]
            returns.insert(0, gae + values[t])
        return advantages, returns

    def get_values(self):
        """
        Compute the value function estimates for all stored states.
        """
        states_tensor = torch.FloatTensor(self.states).to(device)
        with torch.no_grad():
            _, values = self.policy_value_net(states_tensor)
        return values.squeeze().cpu().numpy()

    def optimize(self, advantages, returns):
        """
        Optimize the policy and value networks based on the stored transitions.
        """
        states_tensor = torch.FloatTensor(self.states).to(device)
        actions_tensor = torch.LongTensor(self.actions).to(device)
        old_log_probs_tensor = torch.FloatTensor(self.log_probs).to(device)
        returns_tensor = torch.FloatTensor(returns).to(device)
        advantages_tensor = torch.FloatTensor(advantages).to(device)

        for _ in range(self.update_epochs):
            logits, values = self.policy_value_net(states_tensor)
            probs = Categorical(logits=logits)
            log_probs = probs.log_prob(actions_tensor)
            entropy = probs.entropy()

            # Compute ratio for clipped loss
            ratios = torch.exp(log_probs - old_log_probs_tensor)

            # PPO clipping objective
            surr1 = ratios * advantages_tensor
            surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages_tensor
            policy_loss = -torch.min(surr1, surr2).mean()

            # Value loss
            value_loss = nn.MSELoss()(values.squeeze(), returns_tensor)

            # Combined loss
            loss = policy_loss + 0.5 * value_loss - self.entropy_coeff * entropy.mean()

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        # Clear stored transitions
        self.states = []
        self.actions = []
        self.rewards = []
        self.log_probs = []
        self.dones = []


In [9]:
def train_ppo(agent, env, max_episodes=1000, max_steps=200):
    """
    Train the PPO agent in the environment.
    
    Parameters:
        agent: PPOAgent
            The agent to train.
        env: Environment
            The environment the agent interacts with.
        max_episodes: int
            Maximum number of episodes to train.
        max_steps: int
            Maximum number of steps per episode.
    """
    for episode in range(max_episodes):
        print("training episode {}".format(episode))
        # Reset environment and initialize variables
        state = env.reset()
        state = state.flatten()  # Ensure state is a 1D array
        done = False
        score = 0

        while not done:
            # Interact with the environment
            state_tensor = torch.FloatTensor(state)
            action, log_prob, _ = agent.select_action(state_tensor)
            next_state, reward, done, _ = env.step(action)
            next_state = next_state.flatten()  # Flatten next state
            score += reward

            # Store the transition in agent's memory
            agent.store_transition(state, action, reward, log_prob.item(), done)
            state = next_state

        # After the episode ends, compute advantages and returns
        next_state_value = 0
        if not done:  # Use value of last state if not terminal
            state_tensor = torch.FloatTensor(state).to(device)
            _, next_state_value = agent.policy_value_net(state_tensor)
            print("next state: {}".format(next_state_value))

        advantages, returns = agent.compute_advantages_and_returns(next_state_value)

        # Optimize policy and value networks
        agent.optimize(advantages, returns)

        print(f"Episode {episode + 1}/{max_episodes}, Score: {score}")


In [10]:
device = torch.device("cuda")

env = gym.make('2048-v0')
env.seed(42)
env.reset()

agent = PPOAgent(16, 4)

train_ppo(agent, env, max_episodes=1000, max_steps=300)

  logger.warn(
  logger.warn(
  logger.warn(
  logger.warn(


training episode 0


  logger.deprecation(
  logger.warn(f"{pre} is not within the observation space.")
  states_tensor = torch.FloatTensor(self.states).to(device)


Episode 1/1000, Score: 768
training episode 1
Episode 2/1000, Score: 784
training episode 2
Episode 3/1000, Score: 448
training episode 3
Episode 4/1000, Score: 2624
training episode 4
Episode 5/1000, Score: 1124
training episode 5
Episode 6/1000, Score: 1168
training episode 6
Episode 7/1000, Score: 640
training episode 7
Episode 8/1000, Score: 728
training episode 8
Episode 9/1000, Score: 232
training episode 9
Episode 10/1000, Score: 524
training episode 10
Episode 11/1000, Score: 652
training episode 11
Episode 12/1000, Score: 852
training episode 12
Episode 13/1000, Score: 772
training episode 13
Episode 14/1000, Score: 796
training episode 14
Episode 15/1000, Score: 612
training episode 15
Episode 16/1000, Score: 1492
training episode 16
Episode 17/1000, Score: 664
training episode 17
Episode 18/1000, Score: 756
training episode 18
Episode 19/1000, Score: 564
training episode 19
Episode 20/1000, Score: 748
training episode 20
Episode 21/1000, Score: 1248
training episode 21
Episo

KeyboardInterrupt: 