# First test of different reinforcement learning algorithms

In [1]:
import numpy as np
import sys
from pathlib import Path
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter, MaxNLocator
import ipywidgets as widgets
from ipyevents import Event
import time
import gymnasium as gym
import torch

In [2]:
sys.path.insert(1, str(Path("../..")))
import andreas2048
from andreas2048.game import *
from andreas2048 import gym2048
#env = gym.make("andreas_2048")
env = gym2048.Env2048()

### 1 Helper functions

In [3]:
class Policy:

    def __init__(self, env: gym2048.Env2048) -> None:
        self.env = env

    def get_action(self, obs: np.ndarray) -> Action:
        raise NotImplementedError()
    
    def update(self, obs: np.ndarray, action: Action, reward: float, terminated: bool, next_obs):
        pass


def train_agent(policy: type[Policy], episodes: int = 10000):
    scores = []
    highest_tiles = []
    move_counts = []

    target_policy = policy(env)
    behaviour_policy = policy(env)


    for episode in range(episodes):
        # Start a new hand
        obs, info = env.reset()
        done = False

        # Play one complete hand
        while not done:
            # Agent chooses action (initially random, gradually more intelligent)
            action = agent.get_action(obs)

            # Take action and observe result
            next_obs, reward, terminated, truncated, info = env.step(action)

            # Learn from this experience
            agent.update(obs, action, reward, terminated, next_obs)

            # Move to next state
            done = terminated or truncated
            obs = next_obs
        scores.append(env.game.score)
        highest_tiles.append(env.game.highest_tile)
        move_counts.append(env.game.move_count)

    ax1 = plt.subplot()
    ax2 = ax1.twinx()
    ax3 = ax1.twinx()
    ax3.spines.right.set_position(("axes", 1.2))
    ax1.plot(scores, label="Scores")
    p2 = ax2.plot(np.log2(highest_tiles), label="Highest tile", c="orange")
    p3 = ax3.plot(move_counts, label="Move count", c="green")
    ax1.set_xlabel("Episode")
    ax1.set_ylabel("Score")
    ax2.set_ylabel("Highest Tile")
    ax3.set_ylabel("Move count")
    ax2.yaxis.label.set_color(p2[0].get_color())
    ax3.yaxis.label.set_color(p3[0].get_color())
    ax2.yaxis.set_major_formatter(FuncFormatter(lambda x, pos: f"{2**x:n}"))
    ax2.yaxis.set_major_locator(MaxNLocator(integer=True))
    plt.show()

In [4]:
raise RuntimeError("STOP")

RuntimeError: STOP

### 2 DQN Policy with a simple neuronal net

In [None]:
from numpy import ndarray


from andreas2048.game import Action


class DQN_Agent(Policy):

    def __init__(self, 
                 env: gym2048.Env2048, 
                 learning_rate: float = 0.001, 
                 gamma: float = 0.99,
                 epsilon_decay = 0.005,
                 epsilon_min = 0.001,
            ) -> None:
        super().__init__(env)
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.epsilon = 1
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min

        self.rnd = np.random.default_rng()

        self.build_model()

    def get_action(self, obs: np.ndarray) -> Action:
        if self.rnd.random() < self.epsilon:
            return self.env.action_space.sample()
        t = torch.from_numpy(self.env.game.grid_stacks.flatten()).float()
        p = torch.nn.functional.softmax(self.model(t), dim=0).detach().numpy()
        return self.env.action_space.sample(probability=p)

    def build_model(self):
        self.model =  torch.nn.Sequential(
            torch.nn.Linear(in_features=self.env.game.grid_stacks.size, out_features=256), 
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=256, out_features=128),
            torch.nn.ReLU(),
            torch.nn.Linear(in_features=128,out_features=4)
        )
        self.loss_fn = torch.nn.MSELoss()
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)


    def update(self, obs: ndarray, action: Action, reward: float, terminated: bool, next_obs: np.ndarray):
        t0 = torch.from_numpy(obs).float()
        t1 = torch.from_numpy(next_obs).float()
        q0_value: np.ndarray = self.model(t0).detach().numpy()
        q1_value: np.ndarray = self.model(t1).detach().numpy()
        expected_q_values = reward + self.gamma * next_obs
        loss = torch.nn.MSELoss()(t_out, )
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    

env.reset()

scores = []
highest_tiles = []
move_counts = []

episodes = 100
gamma = 0.99
epsilon = 1.0
epsilon_decay = 0.995
epsilon_min = 0.01
target_update_freq = 10

q_policy = DQN_Agent(env)
target_policy = DQN_Agent(env)
target_policy.model.load_state_dict(q_policy.model.state_dict())
optimizer = torch.optim.Adam(q_policy.model.parameters(), lr=1e-3)


for episode in range(episodes):
    obs, info = env.reset()
    done = False

    while not done:
        action = q_policy.get_action(obs)

        next_obs, reward, terminated, truncated, info = env.step(action)

        # Learn from this experience
        agent.update(obs, action, reward, terminated, next_obs)

        # Move to next state
        done = terminated or truncated
        obs = next_obs
    scores.append(env.game.score)
    highest_tiles.append(env.game.highest_tile)
    move_counts.append(env.game.move_count)



In [None]:
DQN_Agent()

In [None]:
t = torch.from_numpy(env.game.grid_stacks.flatten()).float()
t

In [None]:
import torch.nn as nn
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(QNetwork, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )

    def forward(self, x):
        return self.net(x)
    
n = QNetwork(state_dim=256, action_dim=4)

In [None]:
n(t), nn.functional.softmax(n(t), dim=0)