In [None]:
import torch
from torch import nn
import matplotlib.pyplot as plt
from torch.distributions.categorical import Categorical

In [None]:
class ActorNetwork(torch.nn.Module):
    """LSTM RNN for generating words for wordle solver"""
    def __init__(self, input_size, hidden_size, projection=1, num_layers=5, dropout_probability=0):
        super().__init__()
        self.network = nn.LSTM(input_size, hidden_size, num_layers, dropout=dropout_probability, proj_size=projection)
        self.softmax = nn.Softmax(dim=1)
    def forward(self, x):
        output, (hidden, _) = self.network(x)
        probab = self.softmax(output)

        return probab

class CriticNet(nn.Module):
    """Network representing the critic"""
    def __init__(self, in_shape):
        super().__init__()
        self.v_network = nn.Sequential(
            nn.Linear(in_shape, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
)

    def forward(self, x):
        return self.v_network(x)

In [None]:
net = ActorNetwork(1, 32, 26, 5)
input = torch.tensor([[
    [1],
    [2],
    [3],
    [4],
    [5]
]]).float()
print(input.shape)
net(input)

In [None]:
import gym
import gym_wordle
wordle = gym.make("Wordle-v0")

In [None]:
wordle.reset()
obs = wordle.step([2, 17, 0, 13, 4])

In [None]:
wordle.render()
obs = wordle.step([2, 17, 0, 19, 4])
wordle.render()
print(obs)
wordle.hidden_word

In [None]:
wordle.step([17, 4, 1, 0, 1])

In [None]:
import numpy as np
rnn = ActorNetwork(5, 32, 26, 5)
o = np.array([obs[0]['board']])
o = torch.from_numpy(o).to(torch.float32)
print(o.shape)
print(rnn(o))

In [None]:
prob = Categorical(rnn(o))

prob.sample()

In [None]:
def train(replay, q_val):
    vals = torch.vstack(replay.vals)
    
    rewards_tensor = torch.tensor(np.asarray(replay.rewards, dtype=np.float32))
    is_terminal_tensor = torch.tensor(np.asarray(replay.dones, dtype=np.int32))
        
    q_vals_tensor = rewards_tensor + discount_factor * q_val * (1 - is_terminal_tensor)
        
    advantage = q_vals_tensor - vals
    
    critic_loss = (advantage ** 2).mean()
    # critic_loss.requires_grad = True
    adam_critic.zero_grad()
    critic_loss.backward(retain_graph=True)
    adam_critic.step()
    
    
    log_probabs_tensor = torch.vstack(replay.log_probs)
    actor_loss = (-log_probabs_tensor*advantage.detach()).mean()
    # actor_loss.requires_grad = True
    adam_actor.zero_grad()
    actor_loss.backward()
    adam_actor.step()

In [None]:
class Advantage_ActorCritic():
    def __init__(self, world: gym.Env, policy_net: ActorNetwork, critic_net: CriticNet,
                 encoder, policy_alpha, critic_alpha, gamma, max_reward):
        # environment info
        self.world = world
        self.encoder = encoder
        self.max_reward = max_reward

        # actor and critic
        self.actor = policy_net
        self.critic = critic_net
        self.error_buffer = list()
        self.policy_optimizer = torch.optim.Adam(policy_net.network.parameters(), lr=policy_alpha)
        self.v_optimizer = torch.optim.Adam(critic_net.v_network.parameters(), lr=critic_alpha)

        # training info
        self.gamma = gamma
        self.episodes = 0

    def train(self, iterations):
        converged = False
        rewards = list()
        recents = torch.zeros(10)
        i = 0
        while not converged:
            r = self.episode()
            rewards.append(r)

            if i < 10:
                recents[i] = r
                i += 1

            else:
                recents.roll(-1, 0)
                recents[9] = r
            
            # convergence check
            if len(rewards) > 10 and recents.mean() >= self.max_reward:
                converged = True
            converged = True if iterations == self.episodes else False
        
        plt.plot(rewards)
        plt.show()


    def episode(self, training=True):
        done = False
        state = self.world.reset().copy()
        episode_reward = 0
        self.episodes += 1
        while not done:
            # take on policy action
            encoded = self.encoder(state)
            net_out = self.actor(encoded)

            dist = Categorical(net_out)

            state_prime, reward, done, _ = self.world.step(action)

            # fill buffer (faster to do these with big tensor all at once then compute here)
            self.error_buffer.append((state, state_prime, reward))

            # prepare for next iteration
            episode_reward += reward
            state = state_prime.copy()
            if training:
                self.__net_update()

        return episode_reward

    def td_error(self, value, value_prime, reward):
        def loss_fn():
            return reward + self.gamma * value_prime - value
        return loss_fn()

    def policy_error(self, prob, error):
        def loss_fn():
            return -torch.log(prob) * error
        return loss_fn()


    def __net_update(self):
        # calculate error for policy and critic
        self.critic.v_network.zero_grad()
        self.actor.network.zero_grad()

        state, state_prime, reward, prob = self.error_buffer.pop(0)
        value = self.critic(self.encoder(state))
        value_prime = self.critic(self.encoder(state_prime))

        td_error = self.td_error(value, value_prime, reward)
        policy_error = self.policy_error(prob, td_error) 

        # backprop
        if self.episodes > 100:
            td_error.backward()
        else:
            policy_error.backward()
            self.policy_optimizer.step()
        self.v_optimizer.step()