In [13]:
%load_ext autoreload
%autoreload 2
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from IPython.display import clear_output

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## 1. Q-learning in the wild (3 pts)

Here we use the qlearning agent on taxi env from openai gym.
You will need to insert a few agent functions here.

In [14]:
import random,math
import numpy as np
from collections import defaultdict

class QLearningAgent():
  """
    Q-Learning Agent

    Instance variables you have access to
      - self.epsilon (exploration prob)
      - self.alpha (learning rate)
      - self.discount (discount rate aka gamma)

    Functions you should use
      - self.getLegalActions(state)
        which returns legal actions for a state
      - self.getQValue(state,action)
        which returns Q(state,action)
      - self.setQValue(state,action,value)
        which sets Q(state,action) := value

    !!!Important!!!
    NOTE: please avoid using self._qValues directly to make code cleaner
  """
  def __init__(self,alpha,epsilon,discount,getLegalActions):
    "We initialize agent and Q-values here."
    self.getLegalActions= getLegalActions
    self._qValues = defaultdict(lambda:defaultdict(lambda:0))
    self.alpha = alpha
    self.epsilon = epsilon
    self.discount = discount

  def getQValue(self, state, action):
    #print(state)
    #print(action)
    if not (state in self._qValues) or not (action in self._qValues[state]):
        return 0.0
    return self._qValues[state][action]

  def setQValue(self,state,action,value):
    """
      Sets the Qvalue for [state,action] to the given value
    """
    self._qValues[state][action] = value

#---------------------#start of your code#---------------------#

  def getValue(self, state):
    """
      Returns max_action Q(state,action)
      where the max is over legal actions.
    """

    possibleActions = self.getLegalActions(state)
    #If there are no legal actions, return 0.0
    if len(possibleActions) == 0:
            return 0.0

    max_q_value = max([self.getQValue(state, action) for action in possibleActions])
    return max_q_value

  def getPolicy(self, state):
    """
      Compute the best action to take in a state.

    """
    possibleActions = self.getLegalActions(state)

    #If there are no legal actions, return None
    if len(possibleActions) == 0:
            return None
    best_q_value = self.getValue(state)
    best_actions = [action for action in possibleActions if abs(self.getQValue(state, action) - best_q_value) < 1e-10]

    return random.choice(best_actions)

  def getAction(self, state):
    """
      Compute the action to take in the current state, including exploration.

      With probability self.epsilon, we should take a random action.
      otherwise - the best policy action (self.getPolicy).

      HINT: You might want to use util.flipCoin(prob)
      HINT: To pick randomly from a list, use random.choice(list)

    """

    # Pick Action
    possibleActions = self.getLegalActions(state)
    #If there are no legal actions, return None
    if len(possibleActions) == 0:
            return None
    #choose action with epsilon exploration strategy:
    if random.random() < self.epsilon:
            action = random.choice(possibleActions)
    else:
            action = self.getPolicy(state)

    return action

  def update(self, state, action, nextState, reward):
    """
      You should do your Q-Value update here

      NOTE: You should never call this function,
      it will be called on your behalf


    """
    #agent parameters
    gamma = self.discount
    learning_rate = self.alpha

    current_q = self.getQValue(state, action)
    next_max_q = self.getValue(nextState)

    new_q = current_q + learning_rate * (reward + gamma * next_max_q - current_q)
    "*** YOUR CODE HERE ***"
    self.setQValue(state, action, new_q)

In [15]:
import gym
env = gym.make("Taxi-v3")
n_actions = env.action_space.n

In [16]:
def play_and_train(env,agent,t_max=10**4):
    """This function should
    - run a full game, actions given by agent.getAction(s)
    - train agent using agent.update(...) whenever possible
    - return total reward"""
    total_reward = 0.0
    s = env.reset()

    for t in range(t_max):
        a = agent.getAction(s)

        next_s,r,done,_ = env.step(a)

        #<train(update) agent for state s>
        agent.update(s, a, next_s, r)

        s = next_s
        total_reward +=r
        if done:break

    return total_reward

In [17]:
agent = QLearningAgent(alpha=0.15,epsilon=0.15,discount=0.96,
                       getLegalActions = lambda s: range(n_actions))

Достигните положительной награды, постройте график

In [18]:
from IPython.display import clear_output


In [19]:
from IPython.display import clear_output

initial_epsilon = 0.53
final_epsilon = 0.011
decay_rate = 0.996

rewards = []
epsilon_history = []
for i in range(1000):
    rewards.append(play_and_train(env,agent))
    agent.epsilon = max(final_epsilon, initial_epsilon * (decay_rate ** i))
    epsilon_history.append(agent.epsilon)
    if i % 100 ==99:
        clear_output(True)
        print(agent.epsilon)
        plt.plot(rewards)
        plt.show()
print(rewards[-1])

AttributeError: module 'numpy' has no attribute 'bool8'

## 3. Continuous state space (2 pt)

Чтобы использовать табличный q-learning на continuous состояниях, надо как-то их обрабатывать и бинаризовать. Придумайте способ разбивки на дискретные состояния.

In [None]:
env = gym.make("CartPole-v0")
n_actions = env.action_space.n
print("first state:%s"%(env.reset()))


### Play a few games

Постройте распределения различных частей состояния игры. Сыграйте несколько игр и запишите все состояния.

## Binarize environment

In [None]:
from gym.core import ObservationWrapper
class Binarizer(ObservationWrapper):

    def to_bin(self, value, bins):

        return

    def _observation(self,state):

        state = (self.to_bin(state[0], ), self.to_bin(state[1], ), self.to_bin(state[2], ), self.to_bin(state[3], ))

        return state

In [None]:
env = Binarizer(gym.make("CartPole-v0"))

## Learn

In [None]:
agent = QLearningAgent(alpha=,epsilon=,discount=,
                       getLegalActions = lambda s: range(n_actions))

In [None]:
rewards = []
rewBuf = []
ma = -1000000000000
for i in range(10000):
    for i in range(100):
        rewards.append(play_and_train(env,agent))
    agent.epsilon *= #
    rewBuf.append(np.mean(rewards[-100:]))
    clear_output(True)
    print(agent.epsilon)
    print(rewBuf[-1])
    plt.plot(rewBuf)
    if(rewBuf[-1] > 195):
        print("Win!")
        break
    plt.show()


## 4. Experience replay (5 pts)

In [None]:
import random
class ReplayBuffer(object):
    def __init__(self, size):
        """Create Replay buffer.
        Parameters
        ----------
        size: int
            Max number of transitions to store in the buffer. When the buffer
            overflows the old memories are dropped.
        """
        self._storage = []
        self._maxsize = size
        self._replaceId = 0


    def __len__(self):
        return len(self._storage)

    def add(self, obs_t, action, reward, obs_tp1, done):
        '''
        Make sure, _storage will not exceed _maxsize.
        '''
        data = (obs_t, action, reward, obs_tp1, done)
        if len(self._storage) == self._maxsize:
            #
        else:
            #

    def sample(self, batch_size):
        """Sample a batch of experiences.
        Parameters
        ----------
        batch_size: int
            How many transitions to sample.
        Returns
        -------
        obs_batch: np.array
            batch of observations
        act_batch: np.array
            batch of actions executed given obs_batch
        rew_batch: np.array
            rewards received as results of executing act_batch
        next_obs_batch: np.array
            next set of observations seen after executing act_batch
        done_mask: np.array
            done_mask[i] = 1 if executing act_batch[i] resulted in
            the end of an episode and 0 otherwise.
        """

        #

        return states, actions, rewards, next_states, is_done


Some tests to make sure your buffer works right

In [None]:
import numpy as np
replay = ReplayBuffer(2)
obj1 = tuple(range(5))
obj2 = tuple(range(5, 10))
replay.add(*obj1)
assert replay.sample(1)==obj1, "If there's just one object in buffer, it must be retrieved by buf.sample(1)"
replay.add(*obj2)
assert len(replay._storage)==2, "Please make sure __len__ methods works as intended."
replay.add(*obj2)
assert len(replay._storage)==2, "When buffer is at max capacity, replace objects instead of adding new ones."
assert tuple(np.unique(a) for a in replay.sample(100))==obj2
replay.add(*obj1)
assert max(len(np.unique(a)) for a in replay.sample(100))==2
replay.add(*obj1)
assert tuple(np.unique(a) for a in replay.sample(100))==obj1
print ("Success!")

Now let's use this buffer to improve training:

In [None]:
import gym
env = Binarizer(gym.make('CartPole-v0'))
n_actions = env.action_space.n

In [None]:
agent = QLearningAgent(alpha=,epsilon=,discount=,
                       getLegalActions = lambda s: range(n_actions))
replay = ReplayBuffer(10000)

In [None]:
def play_and_train(env, agent, t_max=10**4, batch_size=10):
    """This function should
    - run a full game, actions given by agent.getAction(s)
    - train agent using agent.update(...) whenever possible
    - return total reward"""
    total_reward = 0.0
    s = env.reset()

    for t in range(t_max):
        aсtion = agent.getAction(s)
        next_s, r, done,_ = env.step(aсtion)

        #заполните реплей
        #опционально - моежте также как в варианте без реплея обучаться по состояниям которые

        s = next_s
        total_reward +=r
        if done:break

    #learn from replay

    return total_reward




Train with experience replay

In [None]:
rewards = []
rewBuf = []
ma = -1000000000000
for i in range(10000):
    for i in range(100):
        rewards.append(play_and_train(env,agent, batch_size=1000))
    agent.epsilon *= #
    rewBuf.append(np.mean(rewards[-100:]))
    clear_output(True)
    print(agent.epsilon)
    print(rewBuf[-1])
    plt.plot(rewBuf)
    if(rewBuf[-1] > 195):
        print("Win!")
        break
    plt.show()
