In [5]:
## Preprocess the data

import random
import time

start_time = time.time()

with open("./sokoban01.txt", "r") as file:
    dimensions = file.readline()
    row_size = int(dimensions.split()[0])
    col_size = int(dimensions.split()[1])

    def construct_pairs(n, coordinates):
        ans = set()
        for i in range(1, n*2, 2):
            ans.add((int(coordinates[i]), int(coordinates[i+1])))
        return ans

    # Walls
    walls_line = file.readline().split()
    n_walls = int(walls_line[0])
    wall_coordinates = construct_pairs(n_walls, walls_line)
    print("Wall coordinates", wall_coordinates)

    # Boxes
    boxes_line = file.readline().split()
    n_boxes = int(boxes_line[0])
    box_coordinates = construct_pairs(n_boxes, boxes_line)

    #Storage
    storage_line = file.readline().split()
    n_storage = int(storage_line[0])
    storage_coordinates = construct_pairs(n_storage, storage_line)

    player_location_line = file.readline().split()
    initial_player_location = (int(player_location_line[0]), int(player_location_line[1]))

directions = ["U", "D", "L", "R"]
discount = 1.0
learningRate = 0.7

Wall coordinates {(6, 18), (6, 15), (4, 3), (4, 9), (9, 2), (9, 5), (5, 10), (9, 8), (9, 14), (11, 5), (1, 6), (9, 11), (2, 5), (10, 12), (1, 9), (10, 18), (11, 8), (10, 15), (11, 11), (6, 2), (7, 1), (7, 7), (6, 5), (7, 10), (6, 8), (6, 14), (7, 13), (7, 19), (6, 17), (4, 5), (3, 9), (5, 3), (9, 1), (9, 7), (9, 4), (11, 7), (10, 5), (10, 11), (9, 13), (9, 19), (11, 10), (1, 5), (10, 14), (6, 1), (1, 8), (10, 17), (6, 7), (7, 12), (6, 10), (6, 16), (6, 19), (3, 5), (4, 4), (4, 10), (9, 3), (8, 1), (9, 9), (11, 9), (10, 13), (7, 11), (11, 6), (2, 9), (1, 7), (8, 19), (10, 16), (10, 19), (7, 5), (6, 3), (7, 8), (7, 14)}


In [6]:
## Sokoban state
class SokobanState:
    def __init__(self, boxCoordinates, location):
        self.BoxCoordinates = frozenset(boxCoordinates.copy())
        self.Location = location
    
    def __hash__(self):
        return hash((self.BoxCoordinates, self.Location))

    def __eq__(self, other):
        return (self.BoxCoordinates, self.Location) == (other.BoxCoordinates, other.Location)

    def __repr__(self):
        return str(self.BoxCoordinates) + " " + str(self.Location)

    @staticmethod
    def minStepsBetweenCoordinates(coordinates1, coordinates2):
        return abs(coordinates1[0]-coordinates2[0]) + abs(coordinates1[1]-coordinates2[1])
    
    def totalBoxToClosestStorageSteps(self):
        result = 0
        for box in self.BoxCoordinates.difference(storage_coordinates):
            minFound = float("inf")
            for storage in storage_coordinates.difference(self.BoxCoordinates):
                steps = self.minStepsBetweenCoordinates(box, storage)
                if steps < minFound:
                    minFound = steps
            result += minFound
        return result
    
    def agentToClosestBoxSteps(self):
        minFound = float("inf")
        for box in self.BoxCoordinates.difference(storage_coordinates):
            steps = self.minStepsBetweenCoordinates(self.Location, box)
            if steps < minFound:
                minFound = steps
        return minFound
    
    def remainingBoxes(self):
        count = 0
        for box in self.BoxCoordinates.difference(storage_coordinates):
            count += 1
        return count
    
    def isTerminal(self):
        return all(map(lambda BoxCoor: BoxCoor in storage_coordinates, self.BoxCoordinates))
    
    def boxStuckAtCorner(self, box):
        if box in storage_coordinates:
            return False
        coordinateDifference = [(-1,-1), (-1,+1), (+1, -1), (+1,+1)]
        for difference in coordinateDifference:
            if (box[0]+difference[0], box[1]) in wall_coordinates and (box[0], box[1]+difference[1]) in wall_coordinates:
                return True
        return False
    
    def noSolution(self):
        return any(map(lambda BoxCoor: self.boxStuckAtCorner(BoxCoor), self.BoxCoordinates))
    
    def getTargetLocations(self, action):
        currentLocation = self.Location
        targetLocation = currentLocation
        targetNextLocation = (currentLocation[0]+1, currentLocation[1])
        if action == 'U':
            targetLocation = (currentLocation[0]-1, currentLocation[1])
            targetNextLocation = (currentLocation[0]-2, currentLocation[1])
        elif action == 'D':
            targetLocation = (currentLocation[0]+1, currentLocation[1])
            targetNextLocation = (currentLocation[0]+2, currentLocation[1])
        elif action == 'L':
            targetLocation = (currentLocation[0], currentLocation[1]-1)
            targetNextLocation = (currentLocation[0], currentLocation[1]-2)
        elif action == 'R':
            targetLocation = (currentLocation[0], currentLocation[1]+1)
            targetNextLocation = (currentLocation[0], currentLocation[1]+2)
        return targetLocation, targetNextLocation
    
    def isInvalidAction(self, action):
        targetLocation, targetNextLocation = self.getTargetLocations(action)
        return (targetLocation in wall_coordinates) or (targetLocation in self.BoxCoordinates and targetNextLocation in self.BoxCoordinates) or (targetLocation in self.BoxCoordinates and targetNextLocation in wall_coordinates)
    
    # Avoid agent stuck at local maximum by staying at the same location, only allow movable directions
    def getPossibleActions(self):
        return [action for action in directions if not self.isInvalidAction(action)]

In [7]:
class Sokoban:
    qFunction = {}
    
    def __init__(self, row_size, col_size, boxCoordinates, location):
        self.row_size = row_size
        self.col_size = col_size
        self.currentState = SokobanState(boxCoordinates.copy(), location)

    def resetQFunction(self):
        self.__class__.qFunction.clear()
        
    def getQValue(self, state, action):
        if state.isTerminal():
            return 10000000000
        if state.noSolution():
            return -10000000000
        return self.__class__.qFunction.get((state, action), 0)
    
    def setQValue(self, state, action, newValue):
        self.__class__.qFunction[(state, action)] = newValue
    
    def getCurrentStateActions(self):
        return self.currentState.getPossibleActions()
    
    def getMaxQValue(self, state, possibleActions):
        return max(map(lambda action: self.getQValue(state, action), possibleActions))
    
    def getBestAction(self, state):
        possibleActions = state.getPossibleActions()
        maxQValue = self.getMaxQValue(state, possibleActions)
        for action in possibleActions:
            if self.getQValue(state, action) == maxQValue:
                return action
    
    # create S'
    @staticmethod
    def createStateForAction(state, action):
        targetLocation, targetNextLocation = state.getTargetLocations(action)
        newLocation = targetLocation
        # map frozenset back to a mutable set
        newBoxCoordinates = set(state.BoxCoordinates.copy())
        if targetLocation in newBoxCoordinates:
            newBoxCoordinates.remove(targetLocation)
            newBoxCoordinates.add(targetNextLocation)
        return SokobanState(newBoxCoordinates, newLocation)
        
    # Take action A, observe R and S'
    # Update Q value
    # Update S to S'
    def takeAction(self, action):
        newState = self.createStateForAction(self.currentState, action)
        
        # Reward R
        R = -1
        # Reward for moving box closer
        stepDifference = newState.totalBoxToClosestStorageSteps() - self.currentState.totalBoxToClosestStorageSteps()
        if stepDifference < 0:
            R += 3
        elif stepDifference == 0:
            R += -3
        
        # Reward for agent getting closer to a box
        distanceToClosestBoxDifference = newState.agentToClosestBoxSteps() - self.currentState.agentToClosestBoxSteps()
        if distanceToClosestBoxDifference < 0:
            R += 1
        elif distanceToClosestBoxDifference > 0:
            R += -1
        
        # Reward for moving box to Storage
        remainingBoxesDifference = newState.remainingBoxes() - self.currentState.remainingBoxes()
        if remainingBoxesDifference < 0:
            R += 15
        elif remainingBoxesDifference > 0:
            R += -10
        
        currentQValue = self.getQValue(self.currentState, action)
        newQValue = currentQValue + learningRate*(R+discount*self.getMaxQValue(newState, newState.getPossibleActions())-currentQValue)
        self.setQValue(self.currentState, action, newQValue)
        self.currentState = newState

In [8]:
sokoban = Sokoban(row_size, col_size, box_coordinates, initial_player_location)
sokoban.resetQFunction()

random.seed(0)
def createRandomStartLocation():
    randomLocation = (1,1)
    while randomLocation in wall_coordinates or randomLocation in box_coordinates or randomLocation in storage_coordinates:
        randomLocation = (random.randint(1, row_size), random.randint(1, col_size))
    return randomLocation

def createAndRunEpisode(startLocation, maxSteps, epsilon):
    # Return True if this episode reached terminal state
    sokoban = Sokoban(row_size, col_size, box_coordinates, startLocation)
    path = ""
    for j in range(maxSteps):
        if random.random() < epsilon:
            action = random.choice(sokoban.currentState.getPossibleActions())
            path += action
        else:
            action = sokoban.getBestAction(sokoban.currentState)
            path += action
        sokoban.takeAction(action)
        if sokoban.currentState.isTerminal():
            print("Stpes used to reach terminal: ", j+1)
            print("Path is %%%%%%% ", path)
            return True
#         if sokoban.currentState.noSolution():
#             return False
    return False

episodes = row_size*col_size*n_boxes*4*100
maxSteps = row_size*col_size*n_boxes
terminalCount = 0
epsilon = 0.3
print('Total episodes: ', episodes)
# Q-learning
for i in range(episodes):
    episodeReachedTerminal = createAndRunEpisode(initial_player_location, maxSteps, epsilon)
    if episodeReachedTerminal:
        terminalCount += 1
        break
    if i%1000 == 0:
        print("Total Completed Episodes: ", i)
        print("Qtable size: ", len(Sokoban.qFunction))
        # print("Qtable vals: ", Sokoban.qFunction)
        timeElapsed = time.time() - start_time
        print('Time elapsed: ', timeElapsed)
        if timeElapsed > 3600:
            print('Timeout')
            break
        terminalCount = 0

end = time.time()
print('Execution time: ', end - start_time)



Total episodes:  501600
Total Completed Episodes:  0
Qtable size:  69
Time elapsed:  0.08396100997924805
Total Completed Episodes:  1000
Qtable size:  44058
Time elapsed:  56.99879503250122
Total Completed Episodes:  2000
Qtable size:  86576
Time elapsed:  111.3319091796875
Total Completed Episodes:  3000
Qtable size:  132130
Time elapsed:  165.57527422904968
Total Completed Episodes:  4000
Qtable size:  173044
Time elapsed:  218.2517020702362
Total Completed Episodes:  5000
Qtable size:  209196
Time elapsed:  273.4903242588043


KeyboardInterrupt: 