In [1]:
import tensorflow as tf
import tensorflow.keras as keras
from dict import WORD_BANK
import random
from tqdm import tqdm
import numpy as np

keys = list(WORD_BANK.keys()) # shorten list for validation purposed
random.shuffle(keys)
keys = keys[:500]
WORD_BANK = {key: tf.constant(i, dtype = tf.int32) for i, key in enumerate(keys)}
INDEX_TO_WORD = [key for key in keys]

VOCAB_SIZE = len(WORD_BANK)

2022-02-01 23:08:27.562623: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2022-02-01 23:08:27.563070: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-02-01 23:08:27.564737: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


In [2]:
ALPHABET = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
LETTER_TO_INDEX = {i: letter for letter, i in enumerate(ALPHABET)}

In [3]:
class Qnet(keras.Model):
    def __init__(self, dropout_rate = .2, reg = None):
        super().__init__()
        """
        A deep Q net for wordle 

        state: 
            grey: 1, 26, locality does not matter
            yellow: word_len, 26, locality does matter
            green: word_len, 26, locality does matter
            
            (N, 11, 26)

        action:
            (N, VOCAB_SIZE) choice a letter bases on state 
        """

        self.flatten = keras.layers.Flatten()

        self.dense0 = keras.layers.Dense(512, activation = 'relu', kernel_regularizer = reg) # 26 inputs for each color; grey, yellow, green
        self.dense1 = keras.layers.Dense(1028, activation = 'relu', kernel_regularizer = reg)
        self.dense2 = keras.layers.Dense(2024, activation = 'relu', kernel_regularizer = reg)
        self.dense3 = keras.layers.Dense(VOCAB_SIZE, activation  = 'softmax')

        self.dropout0 = keras.layers.Dropout(dropout_rate)
        self.dropout1 = keras.layers.Dropout(dropout_rate)
        self.dropout2 = keras.layers.Dropout(dropout_rate)

        self.dropout = bool(dropout_rate)

    def call(self, input):
        x = self.flatten(input)
        if self.dropout:
            x = self.dense0(x)
            x = self.dropout0(x)
            x = self.dense1(x)
            x = self.dropout1(x)
            x = self.dense2(x)
            x = self.dropout2(x)
        else:
            x = self.dense0(x)
            x = self.dense1(x)
            x = self.dense2(x)

        output = self.dense3(x)
        return output

class DoubleDeepQ():
    def __init__(self, 
        loss = tf.keras.losses.huber,
        opt = tf.keras.optimizers.Adam(0.0001),
        gamma = .,5 
        reg = tf.keras.regularizers.l2(),
        dropout_rate = 0
        ):
        super().__init__()

        self.Q = Qnet(reg = reg, dropout_rate = dropout_rate)
        self.Q_target = Qnet()

        self.transfer_weights()

        self.gamma = gamma
        self.opt = opt
        self.loss = loss
    
    def __call__(self, state):
        return self.Q_target(state)

    def update_model(self, state, action, reward, next_state):
        """
        bellman eq: Q(s, a) = Q(s, a) + alpha * (r + gamma * max(Q(s', a) - Q(s, a)))

        state: tensor (N, 15, 27) 
        action: tensor int (N, 1) with axis 1: index
        reward: tensor (N, 1)
        """
        mask = tf.one_hot(action, VOCAB_SIZE) # action is a one hot vector

        future_reward = self.Q_target(next_state)
        updated_q = reward + self.gamma * tf.reduce_max(future_reward) # update q value

        with tf.GradientTape() as tape:
            q_value = self.Q(state) # get q values of each action in the state
            q_of_action = tf.reduce_sum(q_value * mask) # 0 out other action values and sum to get 
            loss = self.loss(updated_q, q_of_action) # calc loss

        grads = tape.gradient(loss, self.Q.trainable_variables) 
        self.opt.apply_gradients(zip(grads, self.Q.trainable_variables))

        return loss

    def best_action(self, state):
        return tf.argmax(self.Q_target(state), axis = 1)
        
    def transfer_weights(self):
        self.Q_target.set_weights(self.Q.get_weights()) 

In [4]:
import random

class Buffer():
    def __init__(self, max_size = 10_000):
        self.max_size = max_size
        self.list = ([], [], [], [])

    def append(self, state, action, reward, next_state):
        if len(self.list) >= self.max_size - 1:
            pop_index = random.randint(0, self.max_size - 2)
            for i in range(4): self.list[i].pop(pop_index)
    
        for i, item in enumerate((state, action, reward, next_state)):
            self.list[i].append(item)

    def unpack_random(self, batch_size):
        length = len(self)
        indices = random.sample(range(length), batch_size)

        state = list(map(lambda i: self.list[0][i], indices))
        action = list(map(lambda i: self.list[1][i], indices))
        reward = list(map(lambda i: self.list[2][i], indices))
        next_state = list(map(lambda i: self.list[3][i], indices))

        state = tf.stack(state)
        action = tf.stack(action)
        reward = tf.stack(reward)
        next_state = tf.stack(next_state)

        return state, action, reward, next_state

    def __len__(self):
        return len(self.list[0])

In [5]:
# this functions provide nice flexible abstrations, they make refactoring much easier 

def action_to_word(action): 
    """action: a (5, 1) value"""
    return INDEX_TO_WORD[int(action)]

def word_to_action(word):
    action = WORD_BANK[word]
    return tf.constant(action)

In [6]:
reward_lose = tf.constant(0, dtype = tf.float32)
reward_win = tf.constant(20, dtype = tf.float32)
reward_valid = tf.constant(0, dtype = tf.float32) # valid word reward 
reward_green = tf.constant(4, dtype = tf.float32) # green letter reward
reward_yellow = tf.constant(2, dtype = tf.float32) # yellow letter reward

games_per_train = 20# how many games to train on 
batch_per_epoch = 30
games_per_test = 20

epsilon = .3

model = DoubleDeepQ()

green_offset = 0
yellow_offset = 5
grey_offset = 10

buffer = Buffer()
batch_size = 64

STATE_SHAPE = (11, 26)

In [7]:
epochs = 300

random_action = lambda : word_to_action(random.choice(list(WORD_BANK)))
loss_avg = 0
reward_avg = 0

exp_avg = lambda avg, r: avg * .99 + r * .01

pbar = tqdm(range(epochs))
for epoch in pbar:
    pbar.set_description(f"Epoch: {epoch:4d}, Loss: {loss_avg:.5f}, Avg reward: {reward_avg:.5f}, epsilon: {epsilon:.3f}")
    for _ in range(batch_per_epoch):
        for game in range(games_per_train):
            correct_word = random.choice(list(WORD_BANK)) # randomly select correct work
            state = tf.zeros((1, *STATE_SHAPE)) # init state
            row = 0 # init the row that the game is on
            while row < 20:
                actions = model(state)
                action = tf.argmax(actions, axis = 1, output_type = tf.int32)[0] if random.random() > epsilon else random_action()

                word = action_to_word(action)

                if word == correct_word: # of the corrct word is guessed
                    reward = reward_win
                    reward_avg = exp_avg(reward_avg, reward)
                    buffer.append(state, action, reward, state)
                    break # break if correct word was guessed 

                reward = tf.constant(0, dtype = tf.float32)
                
                indices = [] # indices where words match in correct possition
                # check to see if letters match in word in the correct position
                for i in range(5):
                    in_word = False
                    for j in range(5):
                        if word[i] == correct_word[j]:
                            if i == j:
                                indices.append([0, green_offset + i, LETTER_TO_INDEX[word[i]]])
                                in_word = True
                                reward += reward_green
                                break
                            else:
                                indices.append([0, yellow_offset + i, LETTER_TO_INDEX[word[i]]])
                                in_word = True
                                reward += reward_yellow
                                break
                    if not in_word:
                        indices.append([0, grey_offset, LETTER_TO_INDEX[word[i]]])

                values = [1] * len(indices) # create a list of ones the same length as indices 
                next_state = tf.tensor_scatter_nd_update(state, indices, values) # update new state 

                buffer.append(state, action, reward, next_state)
                state = next_state # set state to next state

                reward_avg = exp_avg(reward_avg, reward)

                row += 1

        # train model 
        if len(buffer) >= batch_size:
            state, action, reward, next_state = buffer.unpack_random(batch_size = batch_size)
            loss = model.update_model(state, action, reward, next_state)
            loss_avg = loss_avg * 0.995 + loss * 0.005

Epoch:  159, Loss: 1.93725, Avg reward: 4.08933, epsilon: 0.300:  53%|█████▎    | 159/300 [1:06:48<59:15, 25.21s/it]  


KeyboardInterrupt: 

<tf.Tensor: shape=(), dtype=int32, numpy=901>