In [97]:
def debug(*info):
    p = False
    if p:
        print(info)

# Wordle

In [98]:
%pip install simple_colors



In [99]:
from enum import IntEnum
from simple_colors import *
import numpy as np
import random

small_num_actions = 50

actions = np.loadtxt("./actions.txt", dtype=str)
actions = random.sample([word.upper() for word in actions], small_num_actions)
# actions = [word.upper() for word in actions]


class Color(IntEnum):
    GREY = GRAY = 0
    YELLOW = 1
    GREEN = 2


class Tile:
    def __init__(self, character: str, color: Color):
        self.char = character
        self.color = color


class Board:
    def __init__(self):
        self.board = []

    def append_row(self, row: list[Tile]):
        self.board.append(row)

    def get_row(self, i) -> list[Tile]:
        return self.board[i]

    def get_rows(self) -> list[list[Tile]]:
        return self.board

    def get_num_rows(self) -> int:
        return len(self.board)


class WordleGame:
    def new_game(self, answer=None):
        if answer == None:
            self.answer = actions[np.random.randint(0, len(actions))]
        else:
            self.answer = answer
        self.board = Board()
        self.is_complete = False
        self.win = False

    def __init__(self, answer=None):
        self.new_game(answer)

    ## string representation of the Wordle board
    ## returns with color too!
    def __repr__(self):
        s = ""
        # in each line
        for i, line in enumerate(self.board.get_rows()):
            colors = [tile.color for tile in line]
            for i, tile in enumerate(line):
                # for i, char in enumerate(line):

                if tile.color == Color.GREY:
                    s += black(tile.char, "bold")
                elif tile.color == Color.YELLOW:
                    s += yellow(tile.char, "bold")
                else:
                    s += green(tile.char, "bold")
            s += "\n"
        return s

    ## Takes a five-letter guess, records this guess on the game's board.
    ## Returns the array of Colors with each index corresponding to the color of the letter at that index in the guess
    def guess(self, guess):
        debug("Guessing", guess, "on board")
        debug(self)
        tiles = []
        if len(guess) != 5:
            raise ValueError(
                'Wordle guess must be a 5-letter word. Could not guess with word "'
                + guess
                + '".'
            )
        # convert everything to upper case
        guess = guess.upper()
        # debug print
        debug("Your guess:", guess)
        debug("The answer:", self.answer)
        colors = self.get_colors(guess)
        # log guess to board
        tiles = [Tile(guess[i], colors[i]) for i in range(5)]
        self.board.append_row(tiles)

        # check for game over
        if self.board.get_num_rows() >= 6:
            self.is_complete = True
        elif guess == self.answer:
            debug("WIN!")
            self.is_complete = self.win = True

        # give back list of colors
        return colors

    ## get the colors of a word guess
    ## input: string
    ## output: list of Colors, each color corresponding to the
    ##         appropriate game color of the letter at that index
    def get_colors(self, guess: str):
        ## grey by default
        colors = [Color.GREY for i in range(len(guess))]

        # count # occurrences of each of the letters in the correct answer
        occurrences_remaining = {}
        for char in self.answer:
            if char in occurrences_remaining:
                occurrences_remaining[char] += 1
            else:
                occurrences_remaining[char] = 1

        ## appropriately color the letters

        ## greens first
        ## if the character is in the correct place
        for i, char in enumerate(guess):
            if self.answer[i] == char:
                colors[i] = Color.GREEN
                occurrences_remaining[char] -= 1
                debug("Green:", char)

        ## yellows next
        ## if the character is in the word, but in the wrong place
        for i, char in enumerate(guess):
            ## skip if already colored greeen
            ## skip if all occurrences of this letter have been accounted for
            if (
                colors[i] == Color.GREEN
                or char not in occurrences_remaining
                or occurrences_remaining[char] == 0
            ):
                continue

            colors[i] = Color.YELLOW
            debug("Yellow:", char)
            # record that we have accounted for this occurence
            occurrences_remaining[char] -= 1

        return colors

    def is_complete(self):
        return self.is_complete

    def run_game(self):
        print("Welcome to Wordle-AI!")
        while not self.is_complete:
            self.guess(input("Guess: "))
            print(self)
        if self.win:
            print(
                "Congrats! You found the word in", self.board.get_num_rows(), "tries."
            )
        else:
            print("Darn! You didn't find the word. It was " + self.answer + ".")

    def is_win(self):
        return self.win

# DQN

In [100]:
# create the environment
wordleGame = WordleGame()

## wordle wrappers

In [101]:
## convert a row on a board to one-hot encoded format for alphabet letters
def rowToOneHot(row: list[str]):
    oneHot = np.zeros((5, 26))
    for i, char in enumerate(row):
        if char != None:
            oneHot[i][ord(char) - ord("A")] = 1
    return oneHot


## convert a board to a state, which is a flattened version of:
## the board: (6 x 5) wordle board x 26 letters one-hot encoded
## +  colors: (6 x 5) wordle board x 2 color layers (green and yellow,
##                                                  grey is default)
def boardToState(board: Board):
    letters = np.zeros((6, 5, 26))
    colors = np.zeros((6, 5, 2))
    # 840 total size of board state

    for i, row in enumerate(board.get_rows()):
        for j, tile in enumerate(row):
            letters[i][j][ord(tile.char) - ord("A")] = 1
            if tile.color == Color.GREEN:
                colors[i][j][1] = 1
            elif tile.color == Color.YELLOW:
                colors[i][j][0] = 1

    return np.concatenate((letters.flatten(), colors.flatten()))


## get the reward of a guess based on the colors
def getReward(colors: list[Color]):
    reward = 0
    win = True
    for color in colors:
        if color == Color.GREEN:
            reward += 1.5
        elif color == Color.YELLOW:
            reward += 0.5
            win = False
        else:
            reward += 0.2
            win = False
    if win:
        reward += 16 - wordleGame.board.get_num_rows()
    return reward


## convert an action index to model input format
def actionIndToInput(action_ind: int):
    action = actions[action_ind]
    word = []
    for letter in action:
        tile = [0 for i in range(26)]
        tile[ord(letter) - ord("A")] = 1
        word.append(tile)
    return np.concatenate(np.array(word))

In [102]:
def inputToLegible(input):
    # letters are first 6 x 5 x 26  = 780
    arr = np.reshape(input[0:780], (6, 5, 26))
    board = [["" for i in range(5)] for i in range(6)]
    for row in range(6):
        for col in range(5):
            for offset in range(26):
                if arr[row][col][offset] == 1:
                    board[row][col] = chr(offset + ord("A"))
        board[row] = "".join(board[row])

    guess_arr = np.reshape(input[-130:], (5, 26))
    word = ["" for i in range(5)]
    for tile in range(5):
        for offset in range(26):
            if guess_arr[tile][offset] == 1:
                word[tile] = chr(offset + ord("A"))

    return (board, "".join(word))


# inputToLegible(np.concatenate((boardToState(wordleGame.board), actionIndToInput(1))))

In [103]:
## step function for the environment
## input: int action for the word to guess
## output: vector next_state, float reward, boolean done, None info
def step(action: int):
    ## verify action is legit
    if action < 0 or action >= len(actions):
        raise ValueError("Action out of bounds")

    ## take the action (guess)
    guess = actions[action]
    colors = wordleGame.guess(guess)
    ## what we need: next_state, reward, done, info (not used)
    reward = getReward(colors)
    next_state = boardToState(wordleGame.board)
    done = wordleGame.is_complete
    return next_state, reward, done, None


def reset():
    debug("Resetting game")
    wordleGame.new_game(None)
    return boardToState(wordleGame.board)

## replay buffer

In [104]:
class ReplayBuffer:
    """Experience replay buffer that samples uniformly."""

    def __init__(self, size):
        self.buffer = deque(maxlen=size)

    def add(self, state, action, reward, next_state, done: bool):
        self.buffer.append((state, action, reward, next_state, done))

    def __len__(self):
        return len(self.buffer)

    ## random sample of the buffer crossed with a set of random actions of num_actions
    ## input: int num_samples, int num_actions
    ## output: tuple of np.arrays (states, actions, rewards, next_states, dones, potential_actions)
    ## of length num_samples
    def sample(self, num_samples):
        states, selected_actions, rewards, next_states, dones, potential_actions = (
            [],
            [],
            [],
            [],
            [],
            [],
        )
        indexes = np.random.choice(len(self.buffer), num_samples)

        potential_action_indices = np.random.choice(len(actions), num_samples)

        for sample_num in range(num_samples):
            # for potential_action_index in potential_action_indices:
            state, action, reward, next_state, done = self.buffer[sample_num]
            states.append(state)
            selected_actions.append(action)
            rewards.append(reward)
            next_states.append(next_state)
            dones.append(done)
            potential_actions.append(
                actionIndToInput(potential_action_indices[sample_num])
            )

        return (
            np.array(states),  # vector array
            np.array(actions),  # int array
            np.array(rewards),  # float array
            np.array(next_states),  # vector array
            np.array(dones, dtype=float),  # vector array, 1.0 = true, 0.0 = false
            np.array(potential_actions),  # int array
        )

## models

In [105]:
## model input size:
## wordle letters:             6*5*26
## colors for each tile:     + 6*5*2
## action to predict reward: + 5*26
##                           = 970

In [106]:
import tensorflow as tf
import numpy as np
from collections import deque

In [107]:
model_policy = tf.keras.models.Sequential(
    [
        # tf.keras.layers.Flatten(input_shape=(840,)),  # 840 inputs
        tf.keras.layers.Dense(970, activation="relu"),
        tf.keras.layers.Dense(128, activation="relu"),
        tf.keras.layers.Dense(32, activation="relu"),
        tf.keras.layers.Dense(16, activation="relu"),
        tf.keras.layers.Dense(1),
    ]
)

model_target = tf.keras.models.Sequential(
    [
        # tf.keras.layers.Flatten(input_shape=(840,)),  # 840 inputs
        tf.keras.layers.Dense(970, activation="relu"),
        tf.keras.layers.Dense(128, activation="relu"),
        tf.keras.layers.Dense(32, activation="relu"),
        tf.keras.layers.Dense(16, activation="relu"),
        tf.keras.layers.Dense(1),
    ]
)

## training setup

In [108]:
def get_model_input(state, action_ind):
    return np.concatenate((boardToState(state), actionIndToInput(action_ind)))

In [109]:
# select_epsilon_greedy_action moved below

In [110]:
num_episodes = 1000  # @param {type: "integer"}
epsilon = 1.0  # @param {type: "number"}
batch_size = 32  # @param {type: "integer"}
# action_size = 32 # @param {type: "integer"}
discount = 0.9  # @param {type: "number"}
replay_size = 100000  # @param {type: "integer"}

## training

In [111]:
"yes" if (np.array([0, 0]) == np.array([0, 0])).all() else "no"

'yes'

In [None]:
def select_epsilon_greedy_action(epsilon: float):
    result = np.random.uniform(0, 1)
    if result < epsilon:
        # get random action from actions
        action_ind = np.random.randint(0, len(actions))
        debug("Selected action:", actions[action_ind], "(index", str(action_ind) + ")")
        return action_ind
    else:
        # run all possible guesses through the model and select the best one
        all_actions = np.array(
            [
                np.concatenate((boardToState(wordleGame.board), actionIndToInput(i)))
                for i in range(small_num_actions)
            ]
        )

        # return model_target.predict(all_actions)
        preds = model_target.predict(all_actions, verbose=0, use_multiprocessing=True)
        # np.argmax(np.max(preds, axis=1))
        return np.argmax(preds, axis=0)[0]


buffer = ReplayBuffer(replay_size)
cur_frame = 0

last_100_ep_rewards = []
last_100_ep_wins = []

optimizer = tf.keras.optimizers.Adam(1e-4)
mse = tf.keras.losses.MeanSquaredError()


def train_step(states, actions, rewards, next_states, dones, potential_actions):
    # length of states/actions/etc. = batch_size parameter

    q_prime_inputs = np.concatenate((next_states, potential_actions), axis=1)
    q_primes = model_target.predict(q_prime_inputs, verbose=0)

    max_q_primes = tf.reduce_max(q_primes, axis=-1)
    target = rewards + (1 - dones) * discount * np.squeeze(q_primes, axis=1)

    full_inputs = np.concatenate((states, potential_actions), axis=1)

    with tf.GradientTape() as tape:
        ## predict the outcome of taking each action
        q_values = model_policy(full_inputs)  ## shape: (batch_size, 1)
        q_values = tf.squeeze(q_values, axis=-1)  ## shape: (batch_size, )

        ## loss = target vs. maximum possible reward from taking this action
        ##                   (out of all possible actions)
        loss = mse(target, tf.reduce_max(q_values))  ## MSE of two numbers
    grads = tape.gradient(loss, model_policy.trainable_variables)
    optimizer.apply_gradients(zip(grads, model_policy.trainable_variables))


for episode in range(num_episodes + 1):
    state = reset()
    ep_reward = 0
    done = False

    ## play one game
    while not done:
        ## select and perform an action
        action = select_epsilon_greedy_action(epsilon)
        next_state, reward, done, info = step(action)
        ep_reward += reward

        ## save outcome to buffer
        buffer.add(state, action, reward, next_state, done)
        assert not (
            (state == next_state).all()
        )  ## confirm state is being updated properly
        state = next_state
        cur_frame += 1

        # every so often, copy Q weights to Q'
        # if cur_frame % 2000 == 0:
        if cur_frame % 200 == 0:
            model_target.set_weights(model_policy.get_weights())

        ## train the Q neural network (policy network)
        if len(buffer) >= batch_size:
            states, taken_actions, rewards, next_states, dones, potential_actions = (
                buffer.sample(batch_size)
            )
            loss = train_step(
                states, actions, rewards, next_states, dones, potential_actions
            )

    # if episode < 9500:
    if episode < 950:
        epsilon -= 0.001
        # epsilon -= 0.01

    if len(last_100_ep_rewards) == 100:
        last_100_ep_rewards = last_100_ep_rewards[1:]
        last_100_ep_wins = last_100_ep_wins[1:]
    last_100_ep_rewards.append(ep_reward)
    last_100_ep_wins.append(wordleGame.is_win())

    if episode % 5 == 0:
        print(
            f"Episode {episode}/{num_episodes}. Epsilon: {epsilon:.3f}. "
            f"Reward in last 100 episodes: {np.mean(last_100_ep_rewards):.3f} ({np.array(last_100_ep_wins).sum()} wins)"
        )

# Evaluation

In [None]:
def select_greedy_action():
    best_action_ind = None
    best_q = -np.inf

    all_actions = np.array(
        [
            np.concatenate((boardToState(wordleGame.board), actionIndToInput(i)))
            for i in range(small_num_actions)
        ]
    )

    preds = model_qp.predict(all_actions, verbose=0)
    return np.argmax(preds, axis=0)[0]

In [None]:
## play 100 games and see the average number of guesses it takes

for i in range(100):
    state = reset()
    done = False

    while not done:
        action = select_greedy_action(epsilon)
        _, _, done, _ = step(action)

    if wordleGame.is_complete:
        print(
            "Completed game",
            i,
            "in",
            wordleGame.board.get_num_rows(),
            "guesses;",
            ("WIN" if wordleGame.is_win() else "LOSE"),
        )