In [1]:
# from google.colab import drive
# drive.mount('/content/drive')
from copy import copy
from six import StringIO
from pprint import pprint

import gym
from gym import spaces, error, utils
from gym.utils import seeding
import numpy as np


resolved = 0
unfinished = 1
error = 2


def checkSolution(grid):
    N = len(grid)
    for i in range(N):
        for j in range(N):
            if grid[i][j] == 0:
                return unfinished
            n = N // 3
            iOffset = i // n * n
            jOffset = j // n * n
            square = grid[iOffset:iOffset + n, jOffset:jOffset + n].flatten()

            uniqueInRow = countItem(grid[i], grid[i, j]) == 1
            uniqueInCol = countItem(grid[:, j:j + 1].flatten(), grid[i, j]) == 1
            uniqueInSquare = countItem(square, grid[i, j]) == 1

            if not (uniqueInRow and uniqueInCol and uniqueInSquare):
                return error
    return resolved


def countItem(vector, item):
    count = 0
    for item2 in vector:
        if item2 == item: count += 1
    return count


def getSolutions(grid, stopAt=1, i=-1, j=-1, omit=-1):
    N = len(grid)
    check = checkSolution(grid)

    if check == resolved:
        return np.array([grid], dtype=int)
    if check == error:
        return np.empty(shape=(0, N, N), dtype=int)

    if i == -1:
        for i in range(N):
            for j in range(N):
                if grid[i, j] == 0: break
            if grid[i, j] == 0: break

    values = np.arange(1, N + 1)
    np.random.shuffle(values)

    solutions = np.empty(shape=(0, N, N), dtype=int)
    for value in values:
        if omit == value: continue
        cGrid = np.copy(grid)
        cGrid[i, j] = value
        subSolutions = getSolutions(cGrid, stopAt=stopAt - len(solutions))
        solutions = np.concatenate((solutions, subSolutions))
        if len(solutions) >= stopAt:
            return solutions
    return solutions


class SudokuEnv(gym.Env):
    last_action = None

    def __init__(self, n):
        self.n = n
        self.val_limit = self.n - 1
        self.nxn = self.n * self.n
        self.observation_space = spaces.Box(1, self.n, shape=(self.n, self.n))
        # Action = nxn*value + (row*n + col), n = 9 here
        self.action_space = spaces.Discrete(self.nxn * self.val_limit + (self.val_limit * self.n + self.val_limit))
        self.grid = []
        self.original_indices_row = []
        self.original_indices_col = []
        self.base = getSolutions(np.zeros(shape=(self.n, self.n)))[0]

        N = len(self.base)
        positions = []
        for i in range(N):
            for j in range(N):
                positions.append((i, j))
        np.random.shuffle(positions)

        count = 0
        for i, j in positions:
            if count > 1:
                break
            oldValue = self.base[i, j]
            self.base[i, j] = 0
            solutions = getSolutions(self.base, stopAt=2, i=i, j=j, omit=oldValue)
            if len(solutions) == 0:
                count += 1
            else:
                self.base[i, j] = oldValue

    def step(self, action):
        """
        """
        if self.last_action != None and self.last_action == action:
            return np.copy(self.grid), -0.5, False, None
        self.last_action = action
        oldGrid = np.copy(self.grid)

        square = action % self.nxn
        col = square % self.n
        row = (square - col) // self.n
        val = action // self.nxn + 1

        for i in range(len(self.original_indices_row)):
            if col == self.original_indices_col[i] and row == self.original_indices_row[i]:
                # print(f"ORIGINAL FILL Row: {row} Column: {col} Value: {val}")
                return np.copy(self.grid), -1, False, None

        if self.grid[row, col] == val:
            # print("Already there")
            return np.copy(self.grid), -1, False, None

        self.grid[row, col] = val

        stats = checkSolution(self.grid)
        if stats == resolved:
            return np.copy(self.grid), 1, True, None
        elif stats == unfinished:
            return np.copy(self.grid), -0.1, False, None
        elif stats == error:
            self.grid = oldGrid
            return np.copy(self.grid), -1, False, None

    def reset(self):
        self.last_action = None
        self.grid = np.copy(self.base)
        self.original_indices_row, self.original_indices_col = np.nonzero(self.grid)
        return np.copy(self.grid)

    def render(self, mode='human', close=False):
        if self.last_action != None:
            square = self.last_action % self.nxn
            col = square % self.n
            row = (square - col) // self.n
            val = self.last_action // self.nxn
        for i in range(len(self.grid)):
            for j in range(len(self.grid)):
                if self.last_action != None and i == row and j == col:
                    if val == self.grid[i, j]:
                        print('')
                    else:
                        print('')
                else:
                    print('')
                if j % 3 == 2 and j != len(self.grid) - 1:
                    print(' | ')
            if i % 3 == 2 and i != len(self.grid) - 1:
                print('\n-------------------------\n')
            else:
                print('\n')
        print('\n\n')

In [8]:
env= SudokuEnv(9)


In [3]:
import math, random

import torch
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd 
import torch.nn.functional as F

import matplotlib.pyplot as plt

import gym
import numpy as np

from collections import deque
from tqdm import trange

# Select GPU or CPU as device

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [16]:
# epsilon_start = 1.0
# epsilon_final = 0.01
# epsilon_decay = 500

# # Define epsilon as a function of time (episode index)

# eps_by_episode = lambda episode: epsilon_final + (epsilon_start - epsilon_final) * math.exp(-1. * episode / epsilon_decay)

# # Note that the above lambda expression is equivalent to explicitly defining a function:
# # def epsilon_episode(episode):
# #     return epsilon_final + (epsilon_start - epsilon_final) * math.exp(-1. * episode / epsilon_decay)

# plt.plot([eps_by_episode(i) for i in range(10000)])
# plt.title('Epsilon as function of time')
# plt.xlabel('Time (episode index)')
# plt.ylabel('Epsilon')
# plt.show()

In [5]:
# Epsilon annealing schedule generator

def gen_eps_by_episode(epsilon_start, epsilon_final, epsilon_decay):
    eps_by_episode = lambda episode: epsilon_final + (epsilon_start - epsilon_final) * math.exp(-1. * episode / epsilon_decay)
    return eps_by_episode

epsilon_start = 1.0
epsilon_final = 0.01
epsilon_decay = 500
eps_by_episode = gen_eps_by_episode(epsilon_start, epsilon_final, epsilon_decay)

In [6]:
class ReplayBuffer(object):
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        # Add batch index dimension to state representations
        state = np.expand_dims(state, 0)
        next_state = np.expand_dims(next_state, 0)            
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return np.concatenate(state), action, reward, np.concatenate(next_state), done
    
    def __len__(self):
        return len(self.buffer)

In [14]:
class DQN(nn.Module):
    
    def __init__(self, n_state, n_action):
        super(DQN, self).__init__()        
        self.layers = nn.Sequential(
            nn.Linear(n_state, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_action)
        )
        
    def forward(self, x):
        return self.layers(x)
    
    def act(self, state, epsilon):
        # Get an epsilon greedy action for given state
        if random.random() > epsilon: # Use argmax_a Q(s,a)
            state = autograd.Variable(torch.FloatTensor(state).unsqueeze(0), volatile=True).to(device)
            q_value = self.forward(state)
            q_value = q_value.cpu().numpy()
            action = q_value.max(1)[1].item()         
        else: # get random action
            action = random.randrange(env.action_space.n)
        return action

In [27]:
env.observation_space.shape

(9, 9)

In [28]:
env.action_space.n

728

In [9]:
model = DQN(env.observation_space.shape[0], env.action_space.n).to(device)
    
optimizer = optim.Adam(model.parameters())

replay_buffer = ReplayBuffer(1000)

In [31]:
def compute_td_loss(model, batch_size, gamma=0.99):

    # Get batch from replay buffer
    state, action, reward, next_state, done = replay_buffer.sample(batch_size)

    # Convert to tensors. Creating Variables is not necessary with more recent PyTorch versions.
    state      = autograd.Variable(torch.FloatTensor(np.float32(state))).to(device)
    next_state = autograd.Variable(torch.FloatTensor(np.float32(next_state)), volatile=True).to(device)
    action     = autograd.Variable(torch.LongTensor(action)).to(device)
    reward     = autograd.Variable(torch.FloatTensor(reward)).to(device)
    done       = autograd.Variable(torch.FloatTensor(done)).to(device)

    # Calculate Q(s) and Q(s')
    q_values      = model(state)
    next_q_values = model(next_state)

    print("q_values.shape", q_values.shape)
    print("action.shape",action.shape)

    # Get Q(s,a) and max_a' Q(s',a')
    action_ = action.unsqueeze(1)
    print("action_.shape", action_.shape)
    q_value          = q_values.gather(1, action.unsqueeze(1)).squeeze(1)
    #q_value          = q_values.gather(1, action_.unsqueeze(1))
    next_q_value     = next_q_values.max(1)[0]
    # Calculate target for Q(s,a): r + gamma max_a' Q(s',a')
    # Note that the done signal is used to terminate recursion at end of episode.
    expected_q_value = reward + gamma * next_q_value * (1 - done)
    
    # Calculate MSE loss. Variables are not needed in recent PyTorch versions.
    loss = (q_value - autograd.Variable(expected_q_value.data)).pow(2).mean()
        
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return loss

In [11]:
def plot(episode, rewards, losses):
    # clear_output(True)
    plt.figure(figsize=(20,5))
    plt.subplot(131)
    plt.title('episode %s. reward: %s' % (episode, np.mean(rewards[-10:])))
    plt.plot(rewards)
    plt.subplot(132)
    plt.title('loss')
    plt.plot(losses)   
    plt.show() 

In [12]:
def train(env, model, eps_by_episode, optimizer, replay_buffer, episodes = 10000, batch_size=32, gamma = 0.99):
    losses = []
    all_rewards = []
    episode_reward = 0
    tot_reward = 0
    tr = trange(episodes+1, desc='Agent training', leave=True)

    # Get initial state input
    state = env.reset()

    # Execute episodes iterations
    for episode in tr:
        tr.set_description("Agent training (episode{}) Avg Reward {}".format(episode+1,tot_reward/(episode+1)))
        tr.refresh() 

        # Get initial epsilon greedy action
        epsilon = eps_by_episode(episode)
        action = model.act(state, epsilon)
        
        # Take a step
        next_state, reward, done, _ = env.step(action)

        # Append experience to replay buffer
        replay_buffer.push(state, action, reward, next_state, done)

        tot_reward += reward
        episode_reward += reward
        
        state = next_state

        # Start a new episode if done signal is received
        if done:
            state = env.reset()
            all_rewards.append(episode_reward)
            episode_reward = 0

        # Train on a batch if we've got enough experience
        if len(replay_buffer) > batch_size:
            loss = compute_td_loss(model, batch_size, gamma)
            losses.append(loss.item())
            
    plot(episode, all_rewards, losses)  
    return model,all_rewards, losses

In [32]:
model,all_rewards, losses = train(env, model, eps_by_episode, optimizer, replay_buffer, episodes = 10000, batch_size=32, gamma = 0.99)

  
Agent training (episode1) Avg Reward 0.0:   0%|          | 0/10001 [00:00<?, ?it/s]

q_values.shape torch.Size([32, 9, 728])
action.shape torch.Size([32])
action_.shape torch.Size([32, 1])





RuntimeError: ignored