## GridWorld con MDPs

In [1]:
#Importación de librerias...

import random
import sys
import mdp
import environment
import util
import optparse

#Creación de la clase Gridworld, usando procesos de Markov
class Gridworld(mdp.MarkovDecisionProcess):
    def __init__(self, grid):
        # layout
        if type(grid) == type([]): grid = makeBoard(grid)
        self.grid = grid
        self.livingReward = 0.0
        self.noise = 0.2

#Creación del ambiente        
class Environment:
    def __init__(self, grid):
        # layout
        if type(grid) == type([]): grid = makeBoard(grid)
        self.grid = grid
        self.current = (0,0)

    #Definición del metodo para el retorno del estado (casilla) actual del agente.        
    def get_current_state(self):
        return self.current_state

    #Definición del metodo para el retorno de las acciones disponibles para cada estado, evaluando la tupla (i,j)
    def get_possible_actions(self, state):
        actions = ()
        if state[0] > 0:
          actions += ('up', ) #Arriba
        if state[0] < self.dimensions -1:
          actions += ('down', ) #Abajo
        if state[1] > 0:
          actions += ('left', ) #Izquierda
        if state[1] < self.dimensions -1:
          actions += ('right', ) #Derecha
        return actions

    #Metodo que recibe la acción a ejecutar. Retorna el nuevo estado del agente y el valor de la recompensa obtenida
    def do_action(self, action):
        if action == 'north':
          self.current_state[0] -= 1
        elif action == 'south':
          self.current_state[0] += 1
        elif action == 'left':
          self.current_state[1] -= 1
        elif action == 'right':
          self.current_state[1] += 1
        reward = self.rewards[self.current_state[0]][self.current_state[1]]
        return (reward, self.current_state)
    
    #Metodo de restablecimiento a estado inicial.
    def reset(self):
        self.current_state = (0,0)

    #Metodo para el determinar el final o no de la ejecución del juego. 
    def is_terminal(self):
        state = self.get_current_state()
        reward = self.rewards[state[0]][state[1]]
        return reward == 1    

#Creación de la clase Board     
class Board:
    
    #Definición de las dimensiones del tablero
    def __init__(self, width, height, initialValue=' '):
        self.width = width
        self.height = height
        self.data = [[initialValue for y in range(height)] for x in range(width)]
        self.terminalState = 'TERMINAL_STATE'

#Construcción del tablero segun dimensiones.        
def makeBoard(boardString):
    width, height = len(boardString[0]), len(boardString)
    grid = (width, height)
    for ybar, line in enumerate(boardString):
        y = height - ybar - 1
        for x, el in enumerate(line):
            grid[x][y] = el
    return grid

#caracteristicas del tablero (S=punto de partida, +1 Recompensa positiva, -1 Recompensa negativa
#' ' por defecto = 0.0, # = obstaculos) 
def board():
        
    grid = [['S',' ',' ',' ',' ',' ',' ',' ',' ',' ',],
            [' ',' ',' ',' ',' ',' ',' ',' ',' ',' ',],
            [' ','#','#','#','#',' ','#','#','#',' ',],
            [' ',' ',' ',' ','#',' ',' ',' ',' ',' ',],
            [' ',' ',' ',' ','#', -1,' ',' ',' ',' ',],
            [' ',' ',' ',' ','#', +1,' ',' ',' ',' ',],
            [' ',' ',' ',' ','#',' ',' ',' ',' ',' ',],
            [' ',' ',' ',' ','#', -1, -1,' ',' ',' ',],
            [' ',' ',' ',' ',' ',' ',' ',' ',' ',' ',],
            [' ',' ',' ',' ',' ',' ',' ',' ',' ',' ']]
    return Environment(grid)

#Acciones tomadas desde el usuario (manualmente)
def getUserAction(state, actionFunction):
    import graphicsUtils
    action = None
    while True:
        keys = graphicsUtils.wait_for_keys()
        if 'Up' in keys: action = 'north'
        if 'Down' in keys: action = 'south'
        if 'Left' in keys: action = 'west'
        if 'Right' in keys: action = 'east'
        if 'q' in keys: sys.exit(0)
        if action == None: continue
        break
    actions = actionFunction(state)
    if action not in actions:
        action = actions[0]
    return action

def printString(x): print(x)

    #####################################################################################
    ##                                                                                 ## 
    ## Licensing Information:  You are free to use or extend these projects for        ##
    ## educational purposes provided that (1) you do not distribute or publish         ##
    ## solutions, (2) you retain this notice, and (3) you provide clear                ##
    ## attribution to UC Berkeley, including a link to http://ai.berkeley.edu.         ##
    ##                                                                                 ##
    ## Attribution Information: The Pacman AI projects were developed at UC Berkeley.  ##
    ## The core projects and autograders were primarily created by John DeNero         ##
    ## (denero@cs.berkeley.edu) and Dan Klein (klein@cs.berkeley.edu).                 ##
    ## Student side autograding was added by Brad Miller, Nick Hay, and                ##
    ## Pieter Abbeel (pabbeel@cs.berkeley.edu).                                        ##
    ##                                                                                 ##
    #####################################################################################

#definición de la ejecución por episodio.    
def runEpisode(agent, environment, discount, decision, display, message, pause, episode):
    returns = 0
    totalDiscount = 1.0
    environment.reset()
    if 'startEpisode' in dir(agent): agent.startEpisode()
    message("BEGINNING EPISODE: "+str(episode)+"\n")
    while True:

        # DISPLAY CURRENT STATE
        state = environment.getCurrentState()
        display(state)
        pause()

        # END IF IN A TERMINAL STATE
        actions = environment.getPossibleActions(state)
        if len(actions) == 0:
            message("EPISODE "+str(episode)+" COMPLETE: RETURN WAS "+str(returns)+"\n")
            return returns

        # GET ACTION (USUALLY FROM AGENT)
        action = decision(state)
        if action == None:
            raise 'Error: Agent returned None action'

        # EXECUTE ACTION
        nextState, reward = environment.doAction(action)
        message("Started in state: "+str(state)+
                "\nTook action: "+str(action)+
                "\nEnded in state: "+str(nextState)+
                "\nGot reward: "+str(reward)+"\n")
        # UPDATE LEARNER
        if 'observeTransition' in dir(agent):
            agent.observeTransition(state, action, nextState, reward)

        returns += reward * totalDiscount
        totalDiscount *= discount

    if 'stopEpisode' in dir(agent):
        agent.stopEpisode()

def parseOptions():
    optParser = optparse.OptionParser()
    optParser.add_option('-d', '--discount',action='store',
                         type='float',dest='discount',default=0.9,
                         help='Discount on future (default %default)')
    optParser.add_option('-r', '--livingReward',action='store',
                         type='float',dest='livingReward',default=0.0,
                         metavar="R", help='Reward for living for a time step (default %default)')
    optParser.add_option('-n', '--noise',action='store',
                         type='float',dest='noise',default=0.2,
                         metavar="P", help='How often action results in ' +
                         'unintended direction (default %default)' )
    optParser.add_option('-e', '--epsilon',action='store',
                         type='float',dest='epsilon',default=0.3,
                         metavar="E", help='Chance of taking a random action in q-learning (default %default)')
    optParser.add_option('-l', '--learningRate',action='store',
                         type='float',dest='learningRate',default=0.5,
                         metavar="P", help='TD learning rate (default %default)' )
    optParser.add_option('-i', '--iterations',action='store',
                         type='int',dest='iters',default=10,
                         metavar="K", help='Number of rounds of value iteration (default %default)')
    optParser.add_option('-k', '--episodes',action='store',
                         type='int',dest='episodes',default=1,
                         metavar="K", help='Number of epsiodes of the MDP to run (default %default)')
    optParser.add_option('-g', '--grid',action='store',
                         metavar="G", type='string',dest='grid',default="AssignmentGrid",
                         help='Grid to use (case sensitive; options are AssignmentGrid BookGrid, BridgeGrid, CliffGrid, MazeGrid, default %default)' )
    optParser.add_option('-w', '--windowSize', metavar="X", type='int',dest='gridSize',default=70,
                         help='Request a window width of X pixels *per grid cell* (default %default)')
    optParser.add_option('-a', '--agent',action='store', metavar="A",
                         type='string',dest='agent',default="random",
                         help='Agent type (options are \'random\', \'value\' and \'q\', default %default)')
    optParser.add_option('-t', '--text',action='store_true',
                         dest='textDisplay',default=False,
                         help='Use text-only ASCII display')
    optParser.add_option('-p', '--pause',action='store_true',
                         dest='pause',default=False,
                         help='Pause GUI after each time step when running the MDP')
    optParser.add_option('-q', '--quiet',action='store_true',
                         dest='quiet',default=False,
                         help='Skip display of any learning episodes')
    optParser.add_option('-s', '--speed',action='store', metavar="S", type=float,
                         dest='speed',default=1.0,
                         help='Speed of animation, S > 1.0 is faster, 0.0 < S < 1.0 is slower (default %default)')
    optParser.add_option('-m', '--manual',action='store_true',
                         dest='manual',default=False,
                         help='Manually control agent')
    optParser.add_option('-v', '--valueSteps',action='store_true' ,default=False,
                         help='Display each step of value iteration')

    opts, args = optParser.parse_args()

    if opts.manual and opts.agent != 'q':
        print('## Disabling Agents in Manual Mode (-m) ##')
        opts.agent = None

    # MANAGE CONFLICTS
    if opts.textDisplay or opts.quiet:
    # if opts.quiet:
        opts.pause = False
        # opts.manual = False

    if opts.manual:
        opts.pause = True

    return opts


if __name__ == '__main__':

    opts = parseOptions()

    ###########################
    # GET THE GRIDWORLD
    ###########################

    import gridworld
    mdpFunction = getattr(gridworld, "get"+opts.grid)
    mdp = mdpFunction()
    mdp.setLivingReward(opts.livingReward)
    mdp.setNoise(opts.noise)
    env = gridworld.GridworldEnvironment(mdp)


    ###########################
    # GET THE DISPLAY ADAPTER
    ###########################

    import textGridworldDisplay
    display = textGridworldDisplay.TextGridworldDisplay(mdp)
    if not opts.textDisplay:
        import graphicsGridworldDisplay
        display = graphicsGridworldDisplay.GraphicsGridworldDisplay(mdp, opts.gridSize, opts.speed)
    try:
        display.start()
    except KeyboardInterrupt:
        sys.exit(0)

    ###########################
    # GET THE AGENT
    ###########################

    import valueIterationAgents, qlearningAgents
    a = None
    if opts.agent == 'value':
        a = valueIterationAgents.ValueIterationAgent(mdp, opts.discount, opts.iters)
    elif opts.agent == 'q':
        env.getPossibleActions, opts.discount, opts.learningRate, opts.epsilon
        #simulationFn = lambda agent, state: simulation.GridworldSimulation(agent,state,mdp)
        gridWorldEnv = Environment(mdp)
        actionFn = lambda state: mdp.getPossibleActions(state)
        qLearnOpts = {'gamma': opts.discount,
                      'alpha': opts.learningRate,
                      'epsilon': opts.epsilon,
                      'actionFn': actionFn}
        a = qlearningAgents.QLearningAgent(**qLearnOpts)
    elif opts.agent == 'random':
        # # No reason to use the random agent without episodes
        if opts.episodes == 0:
            opts.episodes = 10
        class RandomAgent:
            def getAction(self, state):
                return random.choice(mdp.getPossibleActions(state))
            def getValue(self, state):
                return 0.0
            def getQValue(self, state, action):
                return 0.0
            def getPolicy(self, state):
                "NOTE: 'random' is a special policy value; don't use it in your code."
                return 'random'
            def update(self, state, action, nextState, reward):
                pass
        a = RandomAgent()
    elif opts.agent == 'asynchvalue':
        a = valueIterationAgents.AsynchronousValueIterationAgent(mdp, opts.discount, opts.iters)
    elif opts.agent == 'priosweepvalue':
        a = valueIterationAgents.PrioritizedSweepingValueIterationAgent(mdp, opts.discount, opts.iters)
    else:
        if not opts.manual: raise Exception('Unknown agent type: '+opts.agent)


    ###########################
    # RUN EPISODES
    ###########################
    # DISPLAY Q/V VALUES BEFORE SIMULATION OF EPISODES
    try:
        if not opts.manual and opts.agent in ('value', 'asynchvalue', 'priosweepvalue'):
            if opts.valueSteps:
                for i in range(opts.iters):
                    tempAgent = valueIterationAgents.ValueIterationAgent(mdp, opts.discount, i)
                    display.displayValues(tempAgent, message = "VALUES AFTER "+str(i)+" ITERATIONS")
                    display.pause()

            display.displayValues(a, message = "VALUES AFTER "+str(opts.iters)+" ITERATIONS")
            display.pause()
            display.displayQValues(a, message = "Q-VALUES AFTER "+str(opts.iters)+" ITERATIONS")
            display.pause()
    except KeyboardInterrupt:
        sys.exit(0)



    # FIGURE OUT WHAT TO DISPLAY EACH TIME STEP (IF ANYTHING)
    displayCallback = lambda x: None
    if not opts.quiet:
        if opts.manual and opts.agent == None:
            displayCallback = lambda state: display.displayNullValues(state)
        else:
            if opts.agent in ('random', 'value', 'asynchvalue', 'priosweepvalue'):
                displayCallback = lambda state: display.displayValues(a, state, "CURRENT VALUES")
            if opts.agent == 'q': displayCallback = lambda state: display.displayQValues(a, state, "CURRENT Q-VALUES")

    messageCallback = lambda x: printString(x)
    if opts.quiet:
        messageCallback = lambda x: None

    # FIGURE OUT WHETHER TO WAIT FOR A KEY PRESS AFTER EACH TIME STEP
    pauseCallback = lambda : None
    if opts.pause:
        pauseCallback = lambda : display.pause()

    # FIGURE OUT WHETHER THE USER WANTS MANUAL CONTROL (FOR DEBUGGING AND DEMOS)
    if opts.manual:
        decisionCallback = lambda state : getUserAction(state, mdp.getPossibleActions)
    else:
        decisionCallback = a.getAction

    # RUN EPISODES
    if opts.episodes > 0:
        print()
        print("RUNNING", opts.episodes, "EPISODES")
        print()
    returns = 0
    for episode in range(1, opts.episodes+1):
        returns += runEpisode(a, env, opts.discount, decisionCallback, displayCallback, messageCallback, pauseCallback, episode)
    if opts.episodes > 0:
        print()
        print("AVERAGE RETURNS FROM START STATE: "+str((returns+0.0) / opts.episodes))
        print()
        print()

    # DISPLAY POST-LEARNING VALUES / Q-VALUES
    if opts.agent == 'q' and not opts.manual:
        try:
            display.displayQValues(a, message = "Q-VALUES AFTER "+str(opts.episodes)+" EPISODES")
            display.pause()
            display.displayValues(a, message = "VALUES AFTER "+str(opts.episodes)+" EPISODES")
            display.pause()
        except KeyboardInterrupt:
            sys.exit(0)

ModuleNotFoundError: No module named 'mdp'