In [14]:
import pygame
import random
import time
import pickle
from collections import defaultdict
from itertools import count
import numpy as np
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributions
from torch.autograd import Variable

In [1]:
pip install pygame

Collecting pygame
[?25l  Downloading https://files.pythonhosted.org/packages/8e/24/ede6428359f913ed9cd1643dd5533aefeb5a2699cc95bea089de50ead586/pygame-1.9.6-cp36-cp36m-manylinux1_x86_64.whl (11.4MB)
[K     |████████████████████████████████| 11.4MB 2.8MB/s 
[?25hInstalling collected packages: pygame
Successfully installed pygame-1.9.6


In [15]:
use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor
Tensor = FloatTensor

In [4]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


In [16]:
class Qlearning:
    def __init__(self,epsilon=0.2, alpha=0.3, gamma=0.9):
        self.epsilon=epsilon
        self.alpha=alpha
        self.gamma=gamma
        self.Q = {} #Q table
        self.last_board=None
        self.q_last=0.0
        self.state_action_last=None

    def game_begin(self):
        self.last_board = None
        self.q_last = 0.0
        self.state_action_last = None


    def epslion_greedy(self, state, possible_moves): #esplion greedy algorithm
        #return  action
        self.last_board = tuple(state)
        if(random.random() < self.epsilon):
            move = random.choice(possible_moves) ##action
            self.state_action_last=(self.last_board,move)
            self.q_last=self.getQ(self.last_board,move)
            return move
        else: #greedy strategy
            Q_list=[]
            for action in possible_moves:
                Q_list.append(self.getQ(self.last_board,action))
            maxQ=max(Q_list)

            if Q_list.count(maxQ) > 1:
                # more than 1 best option; choose among them randomly
                best_options = [i for i in range(len(possible_moves)) if Q_list[i] == maxQ]
                i = random.choice(best_options)
            else:
                i = Q_list.index(maxQ)
            self.state_action_last = (self.last_board, possible_moves[i])
            self.q_last = self.getQ(self.last_board, possible_moves[i])
            return possible_moves[i]


    def getQ(self, state, action): #get Q states
        if(self.Q.get((state,action))) is None:
            self.Q[(state,action)] = 1.0
        return self.Q.get((state,action))

    def updateQ(self, reward, state, possible_moves): # update Q states using Qleanning
        q_list=[]
        for moves in possible_moves:
            q_list.append(self.getQ(tuple(state), moves))
        if q_list:
            max_q_next = max(q_list)
        else:
            max_q_next=0.0
        self.Q[self.state_action_last] = self.q_last + self.alpha * ((reward + self.gamma*max_q_next) - self.q_last)

    def saveQtable(self,file_name):  #save table
        with open(file_name, 'wb') as handle:
            pickle.dump(self.Q, handle, protocol=pickle.HIGHEST_PROTOCOL)

    def loadQtable(self,file_name): # load table
        with open(file_name, 'rb') as handle:
            self.Q = pickle.load(handle)


In [17]:
class Policy(nn.Module):
    """
    The Tic-Tac-Toe Policy
    """
    def __init__(self, input_size=27, hidden_size=64, output_size=9):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(input_size,hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        out = F.softmax(self.fc2(x))
        return out

In [18]:
def select_action(policy, state):
    """Samples an action from the policy at the state."""
    s = np.array([0]*9)
    for i in range(len(state)):
        if state[i] == ' ':
            s[i] = 0
        elif  state[i] == '0':
            s[i] = 1
        else:
            s[i] = 2
    state = s
    state = torch.from_numpy(state).long().unsqueeze(0)
    state = torch.zeros(3,9).scatter_(0,state,1).view(1,27)
    pr = policy(Variable(state))
    m = torch.distributions.Categorical(pr) 
    action = m.sample()
    log_prob = torch.sum(m.log_prob(action))
    
    return action.data[0], log_prob 
    

In [24]:
def finish_episode(saved_rewards, saved_logprobs, gamma=1.0):
    """Samples an action from the policy at the state."""
    policy_loss = []
    returns = compute_returns(saved_rewards, gamma)
    returns = torch.Tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() +
                                            np.finfo(np.float32).eps)
  
    for log_prob, reward in zip(saved_logprobs, returns):
        policy_loss.append(-log_prob * reward)
       
    policy_loss = torch.stack(policy_loss).sum()
    policy_loss.backward(retain_graph=True)
    # note: retain_graph=True allows for multiple calls to .backward()
    # in a single step


In [20]:
def compute_returns(rewards, gamma=1.0):
    """
    Compute returns for each time step, given the rewards
      @param rewards: list of floats, where rewards[t] is the reward
                      obtained at time step t
      @param gamma: the discount factor
      @returns list of floats representing the episode's returns
          G_t = r_t + \gamma r_{t+1} + \gamma^2 r_{t+2} + ... 

    >>> compute_returns([0,0,0,1], 1.0)
    [1.0, 1.0, 1.0, 1.0]
    >>> compute_returns([0,0,0,1], 0.9)
    [0.7290000000000001, 0.81, 0.9, 1.0]
    >>> compute_returns([0,-0.5,5,0.5,-10], 0.9)
    [-2.5965000000000003, -2.8850000000000002, -2.6500000000000004, -8.5, -10.0]
    """
    r = []
    x = 0
    for i in range(len(rewards)):
        x = rewards[i]
        for j in range(0,len(rewards)-i):
            if j!=0:
                x= x + rewards[i + j]*gamma**j
    
        r.append(x)
    return r    

In [21]:
def load_weights(policy, episode = 170000):
    """Load saved weights"""
    weights = torch.load("./RL/policy-%d.pkl" % episode)

In [26]:
class Humanplayer:
    pass

#randomplayer player
class Randomplayer:
    def __init__(self):
        pass
    def move(self,possiblemoves):
        return random.choice(possiblemoves)

class TicTacToe:
    def __init__(self,traning=False):
        self.board = [' ']*9

        self.done = False
        self.humman=None
        self.computer=None
        self.humanTurn=None
        self.training=traning
        self.player1 = None
        self.player2 = None
        self.aiplayer=None
        self.isAI=False
        # if not training display
        if(not self.training):
            pygame.init()
            self.ttt = pygame.display.set_mode((225,250))
            pygame.display.set_caption('Tic-Tac-Toe')

    #reset the game
    def reset(self):
        if(self.training):
            self.board = [' '] * 9
            return 

        self.board = [' '] * 9
        self.humanTurn=random.choice([True,False])

        self.surface = pygame.Surface(self.ttt.get_size())
        self.surface = self.surface.convert()
        self.surface.fill((250, 250, 250))
        #horizontal line
        pygame.draw.line(self.surface, (0, 0, 0), (75, 0), (75, 225), 2)
        pygame.draw.line(self.surface, (0, 0, 0), (150, 0), (150, 225), 2)
        # veritical line
        pygame.draw.line(self.surface, (0, 0, 0), (0,75), (225, 75), 2)
        pygame.draw.line(self.surface, (0, 0, 0), (0,150), (225, 150), 2)

   #evaluate function
    def evaluate(self, ch):
        # "rows checking"
        for i in range(3):
            if (ch == self.board[i * 3] == self.board[i * 3 + 1] and self.board[i * 3 + 1] == self.board[i * 3 + 2]):
                return 1.0, True
        # "col checking"
        for i in range(3):
            if (ch == self.board[i + 0] == self.board[i + 3] and self.board[i + 3] == self.board[i + 6]):
                return 1.0, True
        # diagonal checking
        if (ch == self.board[0] == self.board[4] and self.board[4] == self.board[8]):
            return 1.0, True

        if (ch == self.board[2] == self.board[4] and self.board[4] == self.board[6]):
            return 1.0, True
        # "if filled draw"
        if not any(c == ' ' for c in self.board):
            return 0.5, True

        return 0.0, False

    #return remaining possible moves
    def possible_moves(self):
        return [moves + 1 for moves, v in enumerate(self.board) if v == ' ']

    #take next step and return reward
    def step(self, isX, move):
        if(isX):
             ch = 'X'
        else:
            ch = '0'
        if(self.board[move-1]!=' '): # try to over write
            return  -5.0, True

        self.board[move-1]= ch
        reward,done = self.evaluate(ch)
        return reward, done


    #draw move on window
    def drawMove(self, pos,isX):
        row=int((pos-1)/3)
        col=(pos-1)%3

        centerX = ((col) * 75) + 32
        centerY = ((row) * 75) + 32

        reward, done= self.step(isX,pos) #next step
        if(reward==-5): #overlap
            #print('Invalid move')
            font = pygame.font.Font(None, 24)
            text = font.render('Invalid move!', 1, (10, 10, 10))
            self.surface.fill((250, 250, 250), (0, 300, 300, 25))
            self.surface.blit(text, (10, 230))

            return reward, done

        if (isX): #playerX so draw x
            font = pygame.font.Font(None, 24)
            text = font.render('X', 1, (10, 10, 10))
            self.surface.fill((250, 250, 250), (0, 300, 300, 25))
            self.surface.blit(text, (centerX, centerY))
            self.board[pos-1] ='X'

            if(self.humman and reward==1): #if playerX is humman and won, display humman won
                #print('Humman won! in X')
                text = font.render('Humman won!', 1, (10, 10, 10))
                self.surface.fill((250, 250, 250), (0, 300, 300, 25))
                self.surface.blit(text, (10, 230))


            elif (self.computer and reward == 1):#if playerX is computer and won, display computer won
                #print('computer won! in X')
                text = font.render('computer won!', 1, (10, 10, 10))
                self.surface.fill((250, 250, 250), (0, 300, 300, 25))
                self.surface.blit(text, (10, 230))




        else:  #playerO so draw O
            font = pygame.font.Font(None, 24)
            text = font.render('O', 1, (10, 10, 10))

            self.surface.fill((250, 250, 250), (0, 300, 300, 25))
            self.surface.blit(text, (centerX, centerY))
            self.board[pos-1] = '0'

            if (not self.humman and reward == 1):  #if playerO is humman and won, display humman won
                #print('Humman won! in O')
                text = font.render('Humman won!', 1, (10, 10, 10))
                self.surface.fill((250, 250, 250), (0, 300, 300, 25))
                self.surface.blit(text, (10, 230))


            elif (not self.computer and reward == 1):  #if playerO is computer and won, display computer won
                #print('computer won! in O')
                text = font.render('computer won!', 1, (10, 10, 10))
                self.surface.fill((250, 250, 250), (0, 300, 300, 25))
                self.surface.blit(text, (10, 230))



        if (reward == 0.5):  # draw, then display draw
            #print('Draw Game! in O')
            font = pygame.font.Font(None, 24)
            text = font.render('Draw Game!', 1, (10, 10, 10))
            self.surface.fill((250, 250, 250), (0, 300, 300, 25))
            self.surface.blit(text, (10, 230))
            return reward, done

        return reward,done

    # mouseClick position
    def mouseClick(self):
        (mouseX, mouseY) = pygame.mouse.get_pos()
        if (mouseY < 75):
            row = 0
        elif (mouseY < 150):
            row = 1
        else:
            row = 2

        if (mouseX < 75):
            col = 0
        elif (mouseX < 150):
            col = 1
        else:
            col = 2
        return row * 3 + col + 1


     #update state
    def updateState(self,isX):
        pos=self.mouseClick()
        reward,done = self.drawMove(pos,isX)
        return reward, done

    #show display
    def showboard(self):
        self.ttt.blit(self.surface, (0, 0))
        pygame.display.flip()


    #begin training
    def startTraining(self,player1,player2):
        if(isinstance(player1,Qlearning) and isinstance(player2, Qlearning)):   
        #if(isinstance(player1,Qlearning) and isinstance(player2, Qlearning)):
            self.training = True
            self.player1=player1
            self.player2=player2

        elif(isinstance(player1,Qlearning) and isinstance(player2, Policy)):   
        #if(isinstance(player1,Qlearning) and isinstance(player2, Qlearning)):
            self.training = True
            self.player1=player1
            self.player2=player2
        elif(isinstance(player1,Policy) and isinstance(player2, Policy)):   
        #if(isinstance(player1,Qlearning) and isinstance(player2, Qlearning)):
            self.training = True
            self.player1=player1
            self.player2=player2
        elif(isinstance(player1,Policy) and isinstance(player2, Qlearning)):   
        #if(isinstance(player1,Qlearning) and isinstance(player2, Qlearning)):
            self.training = True
            self.player1=player1
            self.player2=player2
   
    #tarin function
    def train(self,iterations, log_interval = 10000):
        if(self.training):
           
            if isinstance(self.player1, Policy):
                    optimizer1 = optim.Adam(self.player1.parameters(), lr=0.001)
                    scheduler1 = torch.optim.lr_scheduler.StepLR(optimizer1, step_size=10000, gamma=0.95)
                    running_reward1 = 0
                  
            if isinstance(self.player2, Policy):
                    optimizer2 = optim.Adam(self.player2.parameters(), lr=0.001)
                    scheduler2 = torch.optim.lr_scheduler.StepLR(optimizer2, step_size=10000, gamma=0.95)
                    running_reward2 = 0
                    
            
            for i in range(1,iterations):
                if i % 5000 == 0:
                    print("trainining", i)
                
                if isinstance(self.player1, Policy):
                    saved_rewards1 = [0]
                    saved_logprobs1 = []
                    optimizer1.zero_grad()
                
                else:
                    self.player1.game_begin()
                  
                if isinstance(self.player2, Policy):
                    saved_rewards2 = [0]
                    saved_logprobs2 = []
                    optimizer2.zero_grad()
                
                else:
                    
                    self.player2.game_begin()
                
                self.reset()
                done = False
                isX = random.choice([True, False])
                while not done:
                    if isX:
                        if isinstance(self.player1, Policy):
                            state = self.board
                            move, logprob = select_action(Policy, state)
                            saved_logprobs1.append(logprob)
                            move = move+1
                        else:
                            move = self.player1.epslion_greedy(self.board, self.possible_moves())
                                             
                    else:
                        if isinstance(self.player2, Policy):
                            state = self.board
                            move, logprob = select_action(self.player2, state)
                            saved_logprobs2.append(logprob)
                            move = move+1
                           
                        else:
                            move = self.player2.epslion_greedy(self.board, self.possible_moves())


                    reward, done = self.step(isX, move)
                   
                    if (reward == 1):  # won
                        if (isX):
                            if isinstance(self.player1, Policy):
                                saved_rewards1.append(reward)
                            else:          
                                self.player1.updateQ(reward, self.board, self.possible_moves())
                            if isinstance(self.player2, Policy):
                                saved_rewards2.append(-1*reward)
                            else:
                                self.player2.updateQ(-1 * reward, self.board, self.possible_moves())
                        else:
                            if isinstance(self.player1, Policy):
                                saved_rewards1.append(-1*reward)
                            else:          
                                self.player1.updateQ(-1*reward, self.board, self.possible_moves())
                            if isinstance(self.player2, Policy):
                                saved_rewards2.append(reward)
                            else:
                                self.player2.updateQ(reward, self.board, self.possible_moves())
                            
                    elif (reward == 0.5):  # draw
                            if isinstance(self.player1, Policy):
                                saved_rewards1.append(reward)
                            else:          
                                self.player1.updateQ(reward, self.board, self.possible_moves())
                            if isinstance(self.player2, Policy):
                                saved_rewards2.append(reward)
                            else:
                                self.player2.updateQ(reward, self.board, self.possible_moves())
                        


                    elif (reward == -5):  # illegal move
                        if (isX):
                            if isinstance(self.player1, Policy):
                                saved_rewards1.append(reward)
                            else:          
                                self.player1.updateQ(reward, self.board, self.possible_moves())
                            
                            
                        else:
                            if isinstance(self.player2, Policy):
                                saved_rewards2.append(reward)
                            else:          
                                self.player2.updateQ(reward, self.board, self.possible_moves())
                            
                    elif (reward == 0):
                        if (isX):  # update opposite
                            if isinstance(self.player1, Policy):
                                saved_rewards1.append(reward)
                            else:          
                                self.player1.updateQ(reward, self.board, self.possible_moves())
                            
                        else:
                            if isinstance(self.player2, Policy):
                                saved_rewards2.append(reward)
                            else:          
                                self.player2.updateQ(reward, self.board, self.possible_moves())
                           
                   
                    isX = not isX  #
                    
                if isinstance(self.player1, Policy):
                    R = compute_returns(saved_rewards1)[0]
                    running_reward1 += R
                    
                    finish_episode(saved_rewards1, saved_logprobs1, gamma=0.98)
        
        
                    if i % log_interval == 0:
                       print('Episode {}\tAverage return player1: {:.2f}'.format(
                                i,
                                running_reward1 / log_interval))
                       running_reward1 = 0

                    if i % (log_interval) == 0:
                        torch.save(player1.state_dict(),
                                 "./RL/player1-%d.pkl" % i)

                    if i % 16 == 0: # batch_size
                        
                        optimizer1.step()
                        scheduler1.step()
                        optimizer1.zero_grad()
                    
                if isinstance(self.player2, Policy):
                    R = compute_returns(saved_rewards2)[0]
                    running_reward2 += R
             
                    finish_episode(saved_rewards2, saved_logprobs2, gamma=0.98)
        
        
                    if i % log_interval == 0:
                       print('Episode {}\tAverage return player2: {:.2f}'.format(
                            i,
                            running_reward2 / log_interval))
                       running_reward2 = 0
                       
                    if i % (log_interval) == 0:
                        torch.save(player2.state_dict(),
                                 "./RL/player2-%d.pkl" % i)
                      
                    if i % 16 == 0: # batch_size
                        
                        optimizer2.step()
                        scheduler2.step()
                        optimizer2.zero_grad()
                        
    #save Qtables
    def saveStates(self):
        if isinstance(self.player1, Qlearning):
            self.player1.saveQtable("player1states")
        if isinstance(self.player2, Qlearning):
            self.player2.saveQtable("player2states")


    #start game human vs AI or human vs random
    def startGame(self, playerX, playerO):
        if isinstance(playerX, Humanplayer):
            self.humman, self.computer = True, False
            if isinstance(playerO, Qlearning): #if AI
                self.ai = playerO
                self.ai.loadQtable("player2states") # load saved Q table
                self.ai.epsilon = 0 #set eps to 0 so always choose greedy step
                self.isAI = True
            elif isinstance(playerO, Randomplayer): #if random
                self.ai = playerO
                self.isAI = False
            elif isinstance(playerO, Policy):
                self.ai = playerO
                load_weights(self.ai)
                self.ai.epsilon = 0 #set eps to 0 so always choose greedy step
                self.isAI = True
                
        elif isinstance(playerO, Humanplayer):
            self.humman, self.computer = False, True
            if isinstance(playerX, Qlearning): #if AI
                self.ai = playerX
                self.ai.loadQtable("player1states") # load saved Q table
                self.ai.epsilon = 0 #set eps to 0 so always choose greedy step
                self.isAI = True
            elif isinstance(playerX, Randomplayer):#if random
                self.ai=playerX
                self.isAI = False
            elif isinstance(playerX, Policy):
                self.ai = playerX
                load_weights(self.ai)
                self.ai.epsilon = 0 #set eps to 0 so always choose greedy step
                self.isAI = True

    def render(self):
        running = 1
        done = False
        pygame.event.clear()
        while (running == 1):
            if (self.humanTurn): #humman click
                print("Human player turn")
                event = pygame.event.wait()
                while event.type != pygame.MOUSEBUTTONDOWN:
                    event = pygame.event.wait()
                    self.showboard()
                    if event.type == pygame.QUIT:
                        running = 0
                        print("pressed quit")
                        break

                reward, done = self.updateState(self.humman) #if random
                self.showboard()
                if (done): #if done reset
                    time.sleep(1)
                    self.reset()
            else:  #AI or random turn
                if self.isAI:
                    
                    if isinstance(self.ai, Policy):
                        move, _ = select_action(self.ai, self.board)
                        moves = move+1
                        reward, done = self.drawMove(moves, self.computer)
                    else:
                        moves = self.ai.epslion_greedy(self.board, self.possible_moves())
                        reward, done = self.drawMove(moves, self.computer)
                    print("computer's AI player turn")
                    self.showboard()
                else: #random player
                    moves = self.ai.move(self.possible_moves()) #random player
                    reward, done = self.drawMove(moves, self.computer)
                    print("computer's random player turn")
                    self.showboard()

                if (done): #if done reset
                    time.sleep(1)
                    self.reset()

            self.humanTurn = not self.humanTurn

In [27]:
game = TicTacToe(True) #game instance, True means training
player1= Qlearning() #player1 learning agent 
player2 = Policy() #player2 learning agent 
game.startTraining(player1,player2) #start training
game.train(500000, log_interval=10000) #train for 200,000 iterations
game.saveStates()  #save Qtable

  if sys.path[0] == '':


trainining 5000
trainining 10000
Episode 10000	Average return player2: -3.91
trainining 15000
trainining 20000
Episode 20000	Average return player2: -3.46
trainining 25000
trainining 30000
Episode 30000	Average return player2: -3.09
trainining 35000
trainining 40000
Episode 40000	Average return player2: -2.83
trainining 45000
trainining 50000
Episode 50000	Average return player2: -2.38
trainining 55000
trainining 60000
Episode 60000	Average return player2: -2.03
trainining 65000
trainining 70000
Episode 70000	Average return player2: -1.86
trainining 75000
trainining 80000
Episode 80000	Average return player2: -1.78
trainining 85000
trainining 90000
Episode 90000	Average return player2: -1.50
trainining 95000
trainining 100000
Episode 100000	Average return player2: -1.48
trainining 105000
trainining 110000
Episode 110000	Average return player2: -1.40
trainining 115000
trainining 120000
Episode 120000	Average return player2: -1.33
trainining 125000
trainining 130000
Episode 130000	Averag

KeyboardInterrupt: 

In [13]:
game = TicTacToe() #game instance
player1=Humanplayer() #human player
player2=Policy()  #agent
game.startGame(player1,player2)#player1 is X, player2 is 0
game.reset() #reset
game.render() # render display

Human player turn




computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player turn
Human player turn
computer's AI player tur

KeyboardInterrupt: 

In [0]:
game = TicTacToe() #game instance
player1=Humanplayer() #human player
player2=Qlearning()  #agent
game.startGame(player1,player2)#player1 is X, player2 is 0
game.reset() #reset
game.render() # render display