Import Packages

In [None]:
import time
from random import randrange, choice, random
from collections import deque, namedtuple

import tensorflow as tf
import numpy as np
import utils

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.losses import MSE
from tensorflow.keras.optimizers import Adam

from kaggle_environments import evaluate, make

Hyperparameters

In [None]:
# Set the random seed for TensorFlow
tf.random.set_seed(0)
MEMORY_SIZE = 100_000     # size of memory buffer
GAMMA = 0.995             # discount factor
ALPHA = 1e-3              # learning rate  
NUM_STEPS_FOR_UPDATE = 4  # perform a learning update every C time steps

ConnectX - Kaggle Environment

In [None]:
env = make("connectx", debug=True)
env.render()
print(env.name, env.version)
print("Default Agents: ", *env.agents)
print('env: ', env.specification.observation)

In [None]:
state_size = (env.specification.configuration.rows.default, env.specification.configuration.columns.default)
num_actions = env.specification.configuration.columns.default

state_size = (env.configuration.rows * env.configuration.columns,)
num_actions = env.configuration.columns

print('State Shape:', state_size)
print('Number of actions:', num_actions)

Deep Q-Learning

In [None]:
# Create the Q-Network
q_network = Sequential([
    Input(shape=state_size),                      
    Dense(units=25, activation='relu'),            
    Dense(units=15, activation='relu'),            
    Dense(units=num_actions, activation='linear'),
])

# Create the target Q^-Network
target_q_network = Sequential([
    Input(shape=state_size),                      
    Dense(units=25, activation='relu'),            
    Dense(units=15, activation='relu'),            
    Dense(units=num_actions, activation='linear'),
])

optimizer = Adam(learning_rate=ALPHA)

Experience Replay

In [None]:
# Store experiences as named tuples
experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

Epsilon-greedy action function

In [None]:

def find_free_columns(flat_board):
    board = np.array(flat_board).reshape(6,7)
    free_columns = []
    for c in range(7):
        if 0 in board[:, c]:
            free_columns.append(c)
    return free_columns

In [61]:
def get_action(state, configuration, q_values, epsilon=0):
    free_columns = find_free_columns(state)
    print('free_columns: ', free_columns)
    if random() > epsilon:
        # todo: handle the situation when there is no empty space left on the board
        q_values_ndarray = q_values.numpy()[0]
        
        return q_values_ndarray[free_columns[np.argmax(q_values_ndarray[free_columns]).item()]]
    else:
        return choice(free_columns)

In [65]:
gg2 = np.arange(7)
gg2 = np.array([-3, 1, 0.5, 1.23, -2, 5.3, 0.02])
g1 = np.array([1, 0, 0, 2, 1, 0, 2, 1, 0, 0, 2, 1, 0, 2, 1, 0, 0, 2, 1, 0, 2, 1, 0, 0, 2, 1, 0, 2, 1, 0, 0, 2, 1, 0, 2, 1, 0, 0, 2, 1, 0, 2])
foo = g1.reshape(6, 7)
print(gg2)
bar = find_free_columns(g1)
print(bar)
ggg3 = gg2[bar]
print('g3: ', ggg3)
g3max = np.argmax(ggg3)
print('g3max: ', g3max)
print(bar[g3max])
print(gg2[bar[g3max]])

[-3.    1.    0.5   1.23 -2.    5.3   0.02]
[1, 2, 5]
g3:  [1.  0.5 5.3]
g3max:  2
5
5.3


In [57]:
x = np.arange(6).reshape((2, 3))
print(x)
res = np.argmax(x, axis=1, keepdims=True)
print(res)

res.shape

[[0 1 2]
 [3 4 5]]
[[2]
 [2]]


(2, 1)

In [None]:
def my_agent(observation, configuration):
    if random.random() > epsilon:
        return np.argmax(q_values.numpy()[0]).item()
    else:
        return randrange(num_actions)
    return choice([c for c in range(configuration.columns) if observation.board[c] == 0])

Compute loss function

In [None]:
def compute_loss(experiences, gamma, q_network, target_q_network):
    # Unpack the mini-batch of experience tuples
    states, actions, rewards, next_states, done_vals = experiences
    
    # Compute max Q^(s,a)
    max_qsa = tf.reduce_max(target_q_network(next_states), axis=-1)
    
    # Set y = R if episode terminates, otherwise set y = R + γ max Q^(s,a).
    y_targets = rewards+((1-done_vals)*gamma*max_qsa)
    
    # Get the q_values
    q_values = q_network(states)
    q_values = tf.gather_nd(q_values, tf.stack([tf.range(q_values.shape[0]),
                                                tf.cast(actions, tf.int32)], axis=1))   
    # Compute the loss
    loss = MSE(y_targets, q_values) 
    
    return loss

Learn function

In [None]:
@tf.function
def agent_learn(experiences, gamma):
    
    # Calculate the loss
    with tf.GradientTape() as tape:
        loss = compute_loss(experiences, gamma, q_network, target_q_network)

    # Get the gradients of the loss with respect to the weights.
    gradients = tape.gradient(loss, q_network.trainable_variables)
    
    # Update the weights of the q_network.
    optimizer.apply_gradients(zip(gradients, q_network.trainable_variables))

    # update the weights of target q_network
    utils.update_target_network(q_network, target_q_network)

Deep-Q Learning

In [53]:
start = time.time()

num_episodes = 2000
max_num_timesteps = 1000

total_point_history = []

num_p_av = 100    # number of total points to use for averaging
epsilon = 1.0     # initial ε value for ε-greedy policy

# Create a memory buffer D with capacity N
memory_buffer = deque(maxlen=MEMORY_SIZE)

# Set the target network weights equal to the Q-Network weights
target_q_network.set_weights(q_network.get_weights())

for i in range(num_episodes):
    
    # Reset the environment to the initial state and get the initial state
    trainer = env.train([None, "random"])
    observation = trainer.reset()

    state = observation.board
    total_points = 0
    
    for t in range(max_num_timesteps):
        
        # From the current state S choose an action A using an ε-greedy policy
        state_qn = np.expand_dims(state, axis=0)  # state needs to be the right shape for the q_network
        q_values = q_network(state_qn)

        action = get_action(state, env.configuration, q_values, epsilon)
        
        # Take action A and receive reward R and the next state S'
        # todo: state != board
        # todo: myagent function
        # todo: observe board before choosing an action
        next_obs, reward, done, _ = trainer.step(action)
        next_state = next_obs.board
        print('action ', action)
        print('next_state ', next_state)
        print('rr ', reward)
        print('done ', done)
        # env.render(mode="ipython", width=100, height=90, header=False, controls=False)
        
        # Store experience tuple (S,A,R,S') in the memory buffer.
        # We store the done variable as well for convenience.
        memory_buffer.append(experience(state, action, reward, next_state, done))
        
        # Only update the network every NUM_STEPS_FOR_UPDATE time steps.
        update = utils.check_update_conditions(t, NUM_STEPS_FOR_UPDATE, memory_buffer)
        
        if update:
            # Sample random mini-batch of experience tuples (S,A,R,S') from D
            experiences = utils.get_experiences(memory_buffer)
            
            # Set the y targets, perform a gradient descent step,
            # and update the network weights.
            agent_learn(experiences, GAMMA)
        
        state = next_state.copy()
        total_points += reward
        
        if done:
            break
            
    total_point_history.append(total_points)
    av_latest_points = np.mean(total_point_history[-num_p_av:])
    
    # Update the ε value
    epsilon = utils.get_new_eps(epsilon)

    print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}", end="")

    if (i+1) % num_p_av == 0:
        print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}")

    # We will consider that the environment is solved if we get an
    # average of 200 points in the last 100 episodes.
    if av_latest_points >= 200.0:
        print(f"\n\nEnvironment solved in {i+1} episodes!")
        q_network.save('checkers_model.h5')
        break
        
tot_time = time.time() - start

print(f"\nTotal Runtime: {tot_time:.2f} s ({(tot_time/60):.2f} min)")

free_columns:  [0, 1, 2, 3, 4, 5, 6]
action  4
next_state  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 0, 0]
rr  0
done  False
free_columns:  [0, 1, 2, 3, 4, 5, 6]
action  5
next_state  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 1, 1, 2]
rr  0
done  False
free_columns:  [0, 1, 2, 3, 4, 5, 6]
action  0
next_state  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 2, 0, 0, 1, 0, 0, 0, 1, 1, 2]
rr  0
done  False
free_columns:  [0, 1, 2, 3, 4, 5, 6]
action  0
next_state  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 1, 0, 0, 0, 2, 0, 2, 1, 0, 0, 0, 1, 1, 2]
rr  0
done  False
free_columns:  [0, 1, 2, 3, 4, 5, 6]
action  5
next_state  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 1, 0, 0, 0, 2, 1, 2, 1, 0, 2, 0, 1, 1, 2

KeyboardInterrupt: 