In [None]:
import torch
import numpy as np
import copy

class N_puzzle():
    def __init__(self,n, difficulty = 5):
        #the n is the dimension of the puzzle
        #the difficulty is the manhattan distance of the starting puzzle
        assert type(n) is int and n > 1
        self.n = n
        self.N = n**2
        #the reason to use torch.tensor here is that it is convenient to input the model
        board = torch.zeros((self.N,self.N - 1)).int()
        for i in range(self.N - 1):
            board[i,i] = 1
        self.board = board.view(n,n,-1)
        #the final is the goal to be achieved
        self.final = copy.deepcopy(self.board)
        #a dict to convert string into direction represented by int
        self.way_dict = {"up":0, "down":1, "left":2, "right":3}
        #convert the direction into vector
        self.direction_list = [(1,0),(-1,0),(0,-1),(0,1)]
        #zero_loc traces the position of the zero or empty
        self.zero_loc = np.array([n-1, n-1])
        #the mark is used to convert the one hot-key puzzle to the visible puzzle
        self.mark = torch.arange(self.N - 1)+1
        self.mark = self.mark.int().view(1,1,-1)
        #initilize the puzzle
        self.random_walk(difficulty)

    # the move will return reward based on the movement:
    # achieve the goal: 10, hit the wall:-1, otherwise: 0
    def move(self, way):
        if isinstance(way, str) and way in self.way_dict:
            way = self.way_dict[way]
        assert isinstance(way, int) and 0 <= way <= 3, "Please input an integer between 0 and 3"
        direction = self.direction_list[way]
        new_loc = self.zero_loc + direction

        if np.any(new_loc >= self.n) or np.any(new_loc < 0):
            return -1  # Hit the wall

        self.swap(self.zero_loc, new_loc)
        self.zero_loc = new_loc

        if self.achieve_final():
            return 10  # Achieve the goal
        else:
            return 0  # Otherwise


    def swap(self, old, new):
        tmp = self.board[old[0],old[1]].clone()
        self.board[old[0],old[1]] = self.board[new[0],new[1]]
        self.board[new[0],new[1]] = tmp

    # This also acts as a reward to guide the puzzle
    def manhattan(self):
        table = self.display()
        row = (table-1) / self.n
        col = (table-1) % self.n
        std = torch.arange(self.n).int()
        d_row = (row - std.view(self.n,1)).abs().sum()
        d_col = (col - std.view(1,self.n)).abs().sum()
        zero_distance = ((table == 0).nonzero().int() - torch.IntTensor([0,self.n - 1])).abs().sum()
        return d_row + d_col - zero_distance

    def achieve_final(self):
        return torch.equal(self.board, self.final)

    def display(self):
        table = self.board.int() * self.mark
        table = table.sum(dim = 2)
        return table

    def random_walk(self, n_step):
        distance = 0
        while distance < n_step:
            step = np.random.randint(4)
            self.move(step)
            distance = self.manhattan()

In [None]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import copy
from collections import namedtuple
import random
# from n_puzzle import N_puzzle

class hidden_unit(nn.Module):
    def __init__(self, in_channels, out_channels, activation):
        super(hidden_unit, self).__init__()
        self.activation = activation
        self.linear = nn.Linear(in_channels, out_channels)

    def forward(self, x):
        out = self.linear(x)
        out = self.activation(out)
        return out

#Here I use the linear model as the network
class Q_learning(nn.Module):
    def __init__(self, in_channels, hidden_layers, out_channels = 4, unit = hidden_unit, activation = F.relu):
        super(Q_learning, self).__init__()
        assert type(hidden_layers) is list
        self.in_channels = in_channels
        self.hidden_units = nn.ModuleList()
        prev_layer = in_channels
        for hidden in hidden_layers:
            self.hidden_units.append(unit(prev_layer, hidden, activation))
            prev_layer = hidden
        self.final_unit = nn.Linear(prev_layer, out_channels)

    def forward(self, x):
        out = x.view(-1, self.in_channels).float()
        for unit in self.hidden_units:
            out = unit(out)
        out = self.final_unit(out)
        return out

# The transition and ReplayMemory are copied from the website:
# http://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
Transition = namedtuple('Transition',
                        ('state', 'action', 'new_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        #"""Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

#Just follow the object-oriented coding style...use a class to train and test
class RL_training():
    def __init__(self, N, difficulty, Q_learning, epsilon = 1, gamma = 0.5, lr = 0.1, buffer = 100, batch_size = 40):
        self.N = N
        self.difficulty = difficulty
        self.N_puzzle = N_puzzle(self.N, difficulty)
        self.model = Q_learning
        self.criterion = torch.nn.MSELoss()
        self.optimizer = optim.RMSprop(self.model.parameters(), lr = lr)
        self.epsilon = epsilon
        self.gamma = gamma
        self.buffer = buffer
        self.batch_size = batch_size
        self.memory = ReplayMemory(buffer)
    def train(self, epochs = 5001, record_sections = 1000):
        #to count the success rate in one record_section
        sucess_count = 0
        for i in range(epochs):
            if i%record_sections ==0:
                print('\n')
                print("epoch is %d" %(i))
                print("epsilon is %.2f" % self.epsilon)
                print("success rate: %.2f" % (sucess_count/record_sections*100))
                sucess_count = 0
            self.N_puzzle = N_puzzle(self.N, self.difficulty)
            step = 0
            while True:
                #get the current state
                state = Variable(self.N_puzzle.board.clone().view(1,-1))
                #get the current Q values
                qval = self.model.forward(state)
                if (np.random.random() < self.epsilon):
                    #choose random action
                    action = np.random.randint(0,4)
                else:
                    #choose best action from Q(s,a) values
                    action = np.argmax(qval.data)
                #Take action, get the reward
                # print(state)
                # print(action)
                # data = input("mark.")
                # print(data)
                # reward = self.N_puzzle.move(action) - self.N_puzzle.manhattan()
                reward = self.N_puzzle.move(int(action)) - self.N_puzzle.manhattan()
                #Acuqire new state
                new_state = Variable(self.N_puzzle.board.clone().view(1,-1))
                #push the state-pair into memory
                self.memory.push(state.data, action, new_state.data, reward)
                step += 1
                # If the memory is not full, skip the update part
                if (len(self.memory) < self.buffer): #if buffer not filled, add to it
                    state = new_state
                    if reward == 10: #if reached terminal state, update game status
                        break
                    elif step >= 20:
                        break
                    else:
                        continue
                #sample a batch of state-pairs
                transitions = self.memory.sample(self.batch_size)
                batch = Transition(*zip(*transitions))
                state_batch = Variable(torch.cat(batch.state))
                action_batch = Variable(torch.LongTensor(batch.action)).view(-1,1)
                # new_state_batch = Variable(torch.cat(batch.new_state), volatile = True)
                with torch.no_grad():
                    new_state_batch = Variable(torch.cat(batch.new_state))
                reward_batch = Variable(torch.FloatTensor(batch.reward))
                # the non_final_mask is to determine the reward
                # If a state achieves the goal, it should only get the reward, not reward + QMax
                # Because there is no future state for it.
                non_final_mask = (reward_batch != 10)
                #Let's run our Q function on S to get Q values for all possible actions
                # we only update the qval[action], leaving qval[not action] unchanged
                state_action_values = self.model(state_batch).gather(1, action_batch)
                #new Q
                newQ = self.model(new_state_batch)
                maxQ = newQ.max(1)[0]
                #update the y
#                 if reward == 0: #non-terminal state
#                     update = (-1.0*distance + (self.gamma * maxQ.data))
#                 else: #terminal state
#                     update = reward
                y = reward_batch
                y[non_final_mask] += self.gamma * maxQ[non_final_mask]
                #the y does not need to be back-propogated, so set to be not volatile.
                # y.volatile = False
                # With this line
                with torch.no_grad():
                    y_detached = y.detach().clone().requires_grad_(True)
                # loss = self.criterion(state_action_values, y)
                target = y.view(-1, 1)  # or target = y.unsqueeze(1)
                loss = self.criterion(state_action_values, target)
                # Optimize the model
                self.optimizer.zero_grad()
                loss.backward()
                for param in self.model.parameters():
                    param.grad.data.clamp_(-1, 1)
                self.optimizer.step()
                # if an agent walks so many times and does not achieve the goal, it is likely that it has lost
                # Therefore, let the agent re-try the puzzle
                if reward == 10:
                    if i %record_sections == 0:
                        print("%d steps are trained and the reward is %d" %(step, reward))
                    sucess_count += 1
                    break
                if step >= 20:
                    if i %record_sections == 0:
                        print("%d steps are trained and stop" % (step))
                    break
            if self.epsilon > 0.1:
                self.epsilon -= (1/epochs)


    def test(self, difficulty = None):
            if type(difficulty) == int:
                self.difficulty = difficulty
            i = 0
            print("Initial State:")
            self.N_puzzle = N_puzzle(self.N, self.difficulty)
            print(self.N_puzzle.display())
            status = 1
            #while game still in progress
            while(status == 1):
                state = Variable(self.N_puzzle.board.clone(), requires_grad = False)
                qval = self.model(state)
                print(qval.data)
                print("distance: ",self.N_puzzle.manhattan())
                # action = np.argmax(qval.data) #take action with highest Q-value
                action = int(torch.argmax(qval.data))  # Convert the tensor result to an integer
                print('Move #: %s; Taking action: %s' % (i, action))
                reward = self.N_puzzle.move(action)
                print(self.N_puzzle.display())
                if reward == 10:
                    status = 0
                    print("Reward: %s" % (reward,))
                i += 1 #If we're taking more than 10 actions, just stop, we probably can't win this game
                if (i > 15):
                    print("Game lost; too many moves.")
                    break


In [None]:
network = Q_learning(240, [500,500], 4, hidden_unit)
train = RL_training(4, 5, network, epsilon = 1, gamma = 0.5, lr = 1e-4)
train.train(10001,1000)



epoch is 0
epsilon is 1.00
success rate: 0.00


epoch is 1000
epsilon is 0.90
success rate: 0.00
20 steps are trained and stop


epoch is 2000
epsilon is 0.80
success rate: 0.00
20 steps are trained and stop


epoch is 3000
epsilon is 0.70
success rate: 0.00
20 steps are trained and stop


epoch is 4000
epsilon is 0.60
success rate: 0.00
20 steps are trained and stop


epoch is 5000
epsilon is 0.50
success rate: 0.00
20 steps are trained and stop


epoch is 6000
epsilon is 0.40
success rate: 0.00
20 steps are trained and stop


epoch is 7000
epsilon is 0.30
success rate: 0.00
20 steps are trained and stop


epoch is 8000
epsilon is 0.20
success rate: 0.00
20 steps are trained and stop


epoch is 9000
epsilon is 0.10
success rate: 0.00
20 steps are trained and stop


epoch is 10000
epsilon is 0.10
success rate: 0.00
20 steps are trained and stop


In [None]:
train.test()

Initial State:
tensor([[ 1,  2,  3,  4],
        [ 5,  6,  7,  8],
        [ 9, 10, 11,  0],
        [13, 14, 15, 12]])
tensor([[ 2.6599, -6.3048, -7.7943, -4.6726]])
distance:  tensor(5.)
Move #: 0; Taking action: 0
tensor([[ 1,  2,  3,  4],
        [ 5,  6,  7,  8],
        [ 9, 10, 11, 12],
        [13, 14, 15,  0]])
Reward: 10
