In [None]:
import numpy as np
from tic_env import TictactoeEnv, OptimalPlayer

# Tic Toc Toe environment

Our 1st game is the famous Tic Toc Toe. You can read about the game and its rules here: https://en.wikipedia.org/wiki/Tic-tac-toe

We implemented the game as an environment in the style of games in the [Python GYM library](https://gym.openai.com/). The commented source code is available in the file "tic_env.py". Here, we give a brief introduction to the environment and how it can be used.

### Initialization and attributes

You can initialize the environment / game as following:

In [None]:
env = TictactoeEnv()

Which then has the following attributes with the corresponding initial values:

In [None]:
env.__dict__

The game is played by two players: player 'X' and player 'O'. The attribute 'current_player' shows whose turn it is. We assume that player 'X' always plays first.

The attribute 'grid' is a 3x3 numpy array and presents the board in the real game and the state $s_t$ in the reinfocement learning language. Each elements can take a value in {0, 1, -1}:
     0 : place unmarked
     1 : place marked with X 
    -1 : place marked with O 
        
The attribute 'end' shows if the game is over or not, and the attribute 'winner' shows the winner of the game: either "X", "O", or None.  

You can use function 'render' to visualize the current position of the board:

In [None]:
env.render()

### Taking actions

The game environment will recieve action from two players in turn and update the grid. At each time, one player can take the action $a_t$, where $a_t$ can either be an integer between 0 to 8 or a touple, corresponding to the 9 possible.

Function 'step' is used to recieve the action of the player, update the grid:

In [None]:
env.step(2)

In [None]:
env.render()

In [None]:
env.__dict__

In [None]:
env.step((1,1))

In [None]:
env.render()

In [None]:
env.__dict__

But not all actions are available at each time: One cannot choose a place which has been taken before. There is an error if an unavailable action is taken:

In [None]:
env.step((0,2))

### Reward

Reward is always 0 until the end of the game. When the game is over, the reward is 1 if you win the game, -1 if you lose, and 0 besides. Function 'observe' can be used after each step to recieve the new state $s_t$, whether the game is over, and the winner, and function 'reward' to get the reward value $r_t$:

In [None]:
env.observe()

In [None]:
env.reward(player='X')

In [None]:
env.reward(player='O')

An example of finishing the game:

In [None]:
env.step(0)
env.step(3)
env.step(1)

In [None]:
env.render()

In [None]:
env.observe()

In [None]:
env.reward(player='X')

In [None]:
env.reward(player='O')

# Optimal policy for Tic Toc Toe environment

Fortunately, we know the exact optimal policy for Tic Toc Toe. We have implemented and $\epsilon$-greedy version of optimal polciy which you can use for the project.

In [None]:
env.reset();

In [None]:
opt_player = OptimalPlayer(epsilon = 0., player = 'X')

In [None]:
opt_player.act(env.grid)

In [None]:
opt_player.player

### An example of optimal player playing against random player

In [None]:
Turns = np.array(['X','O'])
for i in range(5):
    env.reset()
    grid, _, __ = env.observe()
    Turns = Turns[np.random.permutation(2)]
    player_opt = OptimalPlayer(epsilon=0., player=Turns[0])
    player_rnd = OptimalPlayer(epsilon=1., player=Turns[1])
    for j in range(9):
        if env.current_player == player_opt.player:
            move = player_opt.act(grid)
        else:
            move = player_rnd.act(grid)

        grid, end, winner = env.step(move, print_grid=False)

        if end:
            print('-------------------------------------------')
            print('Game end, winner is player ' + str(winner))
            print('Optimal player = ' +  Turns[0])
            print('Random player = ' +  Turns[1])
            env.render()
            env.reset()
            break


### An example of optimal player playing against optimal player

In [None]:
Turns = np.array(['X','O'])
for i in range(5):
    env.reset()
    grid, _, __ = env.observe()
    Turns = Turns[np.random.permutation(2)]
    player_opt_1 = OptimalPlayer(epsilon=0., player=Turns[0])
    player_opt_2 = OptimalPlayer(epsilon=0., player=Turns[1])
    for j in range(9):
        if env.current_player == player_opt.player:
            move = player_opt_1.act(grid)
        else:
            move = player_opt_2.act(grid)

        grid, end, winner = env.step(move, print_grid=False)

        if end:
            print('-------------------------------------------')
            print('Game end, winner is player ' + str(winner))
            print('Optimal player 1 = ' +  Turns[0])
            print('Optimal player 2 = ' +  Turns[1])
            env.render()
            env.reset()
            break


## Q-learning 

In [None]:
from collections import defaultdict
from tqdm import tqdm 

coords = lambda pos: (int(pos / 3), pos % 3) 

def eps_greedy(Q, state, grid, epsilon):
    possible_moves = np.where(np.ravel(grid) == 0)[0]
    if np.random.random() < epsilon:
        position = np.random.choice(possible_moves)
    else:
        possible = [int(elem) for elem in np.argsort(Q[state]) if elem in possible_moves]
        position = possible[-1] if possible is not [] else np.random.choice(possible_moves) 
    return (int(position / 3), position % 3) 

#Q learning with greedy
#Dont necessariliy need to train with player_epsilon = 0.5
def q_learning(env, num_episodes, discount_factor=0.99, alpha=0.05, epsilon=0.3, player_epsilon=0.5):
    Q = defaultdict(lambda: np.zeros(9))
    episode_lengths = np.zeros(num_episodes)
    episode_rewards = np.zeros(num_episodes)
    Turns = np.array(['X','O'])
    
    for i_episode in range(num_episodes):
        env.reset()
        grid, end, winner = env.observe()
        Turns = Turns[np.random.permutation(2)]
        player = OptimalPlayer(epsilon=player_epsilon, player=Turns[1])
        while not end:
            episode_lengths[i_episode] += 1
            if env.current_player == Turns[0]:
                #move = eps_greedy(Q, str(grid), grid, epsilon=epsilon - epsilon/num_episodes*i_episode)
                
                #Remark: for question 2.1 epsilon needs to be constant and not decreasing
                move = eps_greedy(Q, str(grid), grid, epsilon = epsilon)
            else:  
                move = player.act(grid)
            
            new_grid, end, winner = env.step(move, print_grid=False)
            episode_rewards[i_episode] += env.reward(env.current_player)
#             if end: 
#                 Q[str(grid)][move[0] * 3 + move[1]] = env.reward(Turns[0])
#             else: 
#             td_target = (discount_factor * np.max(Q[str(new_grid)])) * alpha 
#             td = (1 - alpha) * Q[str(grid)][move[0] * 3 + move[1]]
#             Q[str(grid)][move[0] * 3 + move[1]] = td + td_target 
            if env.current_player == Turns[0]:
                value = Q[str(grid)][move[0] * 3 + move[1]]
                Q[str(grid)][move[0] * 3 + move[1]] += alpha * (env.reward(Turns[0]) + discount_factor * np.max(Q[str(new_grid)]) - value)
            grid = new_grid 
    
    return Q, episode_lengths, episode_rewards

Q, episode_lengths, episode_rewards = q_learning(env, 20000)

We uodate Q values at every move (optimal player and q-learning player)

In [None]:
from matplotlib import pyplot as plt

def q_eval(Q, other_player_epsilon=0.5, games=20000):
    env.reset()
    total_reward = []
    Turns = np.array(['X','O'])
    count_win = 0 
    for i in tqdm(range(games)):
        grid, end, winner = env.observe()
        Turns = Turns[np.random.permutation(2)]
        player = OptimalPlayer(epsilon=other_player_epsilon, player=Turns[1])
        while not end:
            if env.current_player == Turns[0]:
                possible_moves = np.where(np.ravel(grid) == 0)[0]
                possible = [int(elem) for elem in np.argsort(Q[str(grid)]) if elem in possible_moves]
                position = possible[-1] if possible is not [] else np.random.choice(possible_moves)
                move = (int(position / 3), position % 3) 
            else:  
                move = player.act(grid)
            grid, end, winner = env.step(move, print_grid=False)
            total_reward.append(env.reward(env.current_player))
            if winner == Turns[0]: 
                count_win += 1 
            if end:
#                 print('-------------------------------------------')
#                 print('Game end, winner is player ' + str(winner))
#                 print('Q-Learning player = ' +  Turns[0])
#                 print('Optimal player = ' +  Turns[1])
#                 env.render()
                env.reset()
                break
        
    return np.mean(total_reward), count_win / games 


reward, score = q_eval(Q, other_player_epsilon = 1., games = 2000)
print(score, reward) 

In [None]:
plt.plot(episode_rewards); 

## DQN 

In [None]:
# https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple
from itertools import count
import random
from tic_env import TictactoeEnv, OptimalPlayer
import math
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from tqdm import tqdm

env = TictactoeEnv()


In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

#This seems better because its limited by a max_len=capacity, but dont know it changes much
class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
#replay memory
class DataSampler(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = {'state':[], 'action':[], 'next_state':[], 'reward':[]}
        self.position = 0

    def push(self, state, action, next_state, reward):
        self.memory['state'].append(state)
        self.memory['action'].append(action)
        self.memory['next_state'].append(next_state)
        self.memory['reward'].append(reward)

    def sample(self, batch_size):
        return {key: random.sample(self.memory[key], batch_size) for key in self.memory.keys()}

    def __len__(self):
        return len(self.memory)

In [153]:
#Neural net
class DQN(nn.Module):

    def __init__(self, states, outputs):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=1, stride=1)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=1, stride=1)
        self.conv3 = nn.Conv1d(32, 32, kernel_size=1, stride=1)

        # Number of Linear input connections depends on output of conv2d layers
        # and therefore the input image size, so compute it.
        def conv2d_size_out(size, kernel_size = 1, stride = 1):
            return (size - (kernel_size - 1) - 1) // stride  + 1
        conv_s = conv2d_size_out(conv2d_size_out(conv2d_size_out(states)))
        linear_input_size = conv_s #* 32
        self.head = nn.Linear(32, outputs)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = x.reshape(-1, 1, 1)
        x = x.type(torch.float32)
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        return self.head(x.view(x.size(0), -1))

In [234]:
#Neural net from description 
# (fully connected, 2 hidden layers w 128 neurons each and relu, 3x3x2 inputs and 9 outputs)
class DQN_FNN(nn.Module):

    def __init__(self, n_actions):
        super().__init__()
        #From description states are 3 ×3 × 2 tensor where
        #where [:, :, 0] are moves taken by me, [: ,:, 1] moves taken by opponent
        self.fc1 = nn.Linear(3*3*2, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, n_actions)

    def forward(self, x):
        x = F.relu(self.fc1(x.view(-1, 3*3*2)))
        x = F.relu(self.fc2(x.view(-1, 128)))
        x = self.fc3(x.view(-1, 128)) #should i do (-1, 128) or (128, -1)?
        return x

In [235]:
#Training setup

BATCH_SIZE = 64 #Description sets batch size to 64
GAMMA = 0.99 #discout factor Description sets to 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
FIXED_EPS_THRESHOLD = 0.3
TARGET_UPDATE = 500 #Description: update target_net every 500 games

# Get number of actions from gym action space
#n_states = 5478
n_actions = 9

#policy_net = DQN(n_states, n_actions) #.to(device)
#target_net = DQN(n_states, n_actions) #.to(device)

policy_net = DQN_FNN(n_actions) #.to(device)
target_net = DQN_FNN(n_actions) #.to(device)

target_net.load_state_dict(policy_net.state_dict()) #copy state from policy to target
target_net.eval() #put into eval state(BN, Dropout ect.. are now turned off)

#Description: says to use Adam optimizer with lr = 5*10^-4
#optimizer = optim.RMSprop(policy_net.parameters())
optimizer = optim.Adam(policy_net.parameters(), lr=5e-4) 
#Description: gives buffer of 10'000
#memory = ReplayMemory(10000)
memory = DataSampler(10000) # why use datasample instead of replay memory?

steps_done = 0

#Select action according to epsilon greedy policy.
def select_action(state, flag = False): #What does this flag mean ? i set it to false
    global steps_done
    sample = random.random()
    eps_threshold = EPS_START - EPS_START/num_episodes*(steps_done + 1)
    #eps_threshold = FIXED_EPS_THRESHOLD #Some questions ask for fixed threshold
    steps_done += 1
    if flag:
        with torch.no_grad():
            possible_moves = np.where(np.ravel(state) == 0)[0]
            possible = [int(elem) for elem in np.argsort(np.ravel(policy_net(state).max(1)[1])) 
                        if elem in possible_moves]
            position = possible[-1] if possible is not [] else np.random.choice(possible_moves)
            return position
    # greedy 
    if sample > eps_threshold:
        with torch.no_grad():
            #Description: Allow choice of invalid moves but they cause reward = -1 and end game
            # t.max(1) will return largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            output = policy_net(state)
            #print("output: ")
            #print(output)
            return policy_net(state).max(1)[1].view(1, 1)
            #feed state to policy net and pick possible move with highest q-val
            #pos = policy_net(state).detach().numpy() #detach maybe not necessary
            #possible_moves = np.where(np.ravel(state) == 0)[0]
            #possible = [int(elem) for elem in np.argsort(pos) if elem in possible_moves]
            #position = possible[-1] if possible is not [] else np.random.choice(possible_moves) 
            #return position
    else:
        return torch.tensor([[random.randrange(n_actions)]],dtype=torch.long)


episode_durations = []

In [236]:

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    
    batch = memory.sample(BATCH_SIZE) 

    #transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    #batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          np.ravel(batch["next_state"][-1]))), dtype=torch.bool)
    data = [s for s in batch["next_state"][-1] if s != [None] * 9 and s is not None and s[0] is not None]
    if len(data) > 0:
        #print(batch["next_state"][-1])
        non_final_next_states = torch.stack(data)
    else:
        non_final_next_states = torch.tensor([0])
    state_batch = torch.tensor(batch["state"][-1])
    action_batch = torch.tensor(batch["action"][-1])
    reward_batch = torch.tensor(batch["reward"][-1])

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch)[1, action_batch]

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(9) #, device=device)
    next_state_values[non_final_mask] = max(target_net(non_final_next_states).max(1)[0].detach())
    #target_net(non_final_next_states).max(1)[0].detach() 
    # Compute the expected Q values

    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss() #Huber loss with delta set to 1
#     print(state_action_values, expected_state_action_values)
    loss = criterion(state_action_values, expected_state_action_values)

#     loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.data.clamp_(-1, 1)
    optimizer.step()

In [237]:
#Takes a grid and returns a 3x3x2 tensor which contains only values 0 or 1
# such that [:, :, 0] are moves taken by me, [: ,:, 1] moves taken by opponent
def modify_state_to_description(grid, my_turn_sign): 
    if('X' == my_turn_sign): #if i am first player to play
        my_array =  np.where(grid == -1, 0, grid) #replace -1's with 0(remove other players moves)
        other_array = np.where(grid == 1, 0, grid)  #replace 1's with 0
        other_array = np.where(grid == -1, 1, grid) 
    else: #-1 = my_moves
        my_array = np.where(grid == 1, 0, grid) #remove other players moves
        my_array = np.where(grid == -1, 1, grid) #change -1 to 1's 
        other_array = np.where(grid == -1, 0, grid)
    #print("grid")#print(grid)
    state = np.stack((my_array, other_array))
    #print("state")print(state)
    return state

In [238]:
#Main Training loop

other_player_epsilon = 0.5
num_episodes = 10000
Turns = np.array(['X','O'])

#Train over num_episodes number of games
for i_episode in range(num_episodes):
    steps_done = i_episode
   
    # Initialize the environment and state
    env.reset()
    grid, end, winner = env.observe()
   
    if(i_episode > 0):
        Turns = np.roll(Turns, 1) #first player alternates each game
    my_turn = Turns[0]

    other_player = OptimalPlayer(epsilon=other_player_epsilon, player=Turns[1])
    
    #Loop until game ends
    for t in count():
        state = modify_state_to_description(np.array([[grid]], dtype=np.int32), Turns[0])
        #print(state)
        #state = np.ravel(grid)
        state = torch.from_numpy(state)
        # Resize, and add a batch dimension (BCHW)
        state = state.unsqueeze(0).type(torch.float32) #mmh suspicious

        # Select and perform an action
        if env.current_player == my_turn:
                position = select_action(state)
                #print("selected action ")
                #print(position)
                move = (int(position / 3), int(position) % 3)
        else:  
                move = other_player.act(grid)

        #TODO 
        #From description: if move puts us in an illegal state, we end and give reward = -1.
        #(instead of forcing player to make legal moves)
        possible_moves = np.where(np.ravel(grid.ravel()) == 0)[0]   
        if move[0]*3 + move[1] not in possible_moves:
            #print("not possible move")
            reward = -1
            end = True
        else:
            grid, end, winner = env.step(move, print_grid=False)
            reward = torch.tensor([[env.reward(Turns[0])]])

        # Observe new state
        if not end:
            next_state = np.ravel(grid)
            next_state = torch.from_numpy(next_state) #TODO figure out what exactly is done here
            next_state = next_state.unsqueeze(0).type(torch.float32)
        else:
            next_state = [None] * 9 #TODO check if ok, probably fine instead of [None] * 9 idk
       
        # Store the transition in memory
        action = torch.tensor([[move[0] * 3 + move[1]]]) #move
        #next_state = torch.tensor(next_state)
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()
        
        if end:
            episode_durations.append(t + 1)
            break
    # Update the target network, copying all weights and biases in DQN
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

#     print(f'Completed stage {i_episode+1}')

print('Complete')

Complete


In [239]:
def dqn_eval(other_player_epsilon=0.5, games=20000):
    total_reward = []
    env.reset()
    Turns = np.array(['X','O'])
    count_win, none_win, loose = 0, 0, 0 
    winner_list = [] 
    for i in tqdm(range(games)):
        grid, end, winner = env.observe()
        
        player = OptimalPlayer(epsilon=other_player_epsilon, player=Turns[1])
        while not end:
            if env.current_player == Turns[0]:
                state = modify_state_to_description(np.array([[grid]], dtype=np.int32), Turns[0])
                state = torch.from_numpy(state)
                # Resize, and add a batch dimension (BCHW)
                state = state.unsqueeze(0).type(torch.float32) #mmh suspicious
                position = select_action(state)
                move = (int(position / 3), int(position) % 3)
            else:  
                move = player.act(grid)
            #at this point should not be making invalid moves... because player has trained
            grid, end, winner = env.step(move, print_grid=False)
            total_reward.append(env.reward(env.current_player))
            
            if end:
                if winner == Turns[0]: 
                    count_win += 1 
                elif str(winner) == "None":
                    none_win += 1
                else:
                    loose += 1
#                 print('-------------------------------------------')
#                 print('Game end, winner is player ' + str(winner))
#                 print('Q-Learning player = ', Turns[0])
#                 print(f'Another player with epsilon {other_player_epsilon} = ', Turns[1])
        #             env.render()
                env.reset()
                break
    return count_win / games, none_win / games, loose / games, np.mean(total_reward)


In [281]:
score = dqn_eval(0.5, 1000)
print(f"Wins of DQN: {score[0]}, wins None: {score[1]}, wins Another player: {score[2]}") 

  0%|          | 0/1 [00:00<?, ?it/s]


ValueError: There is already a chess on position (0, 1).