In [2]:
import os
os.chdir("..")
from rl.function_approx import *
from typing import Callable, Iterable, Iterator, TypeVar, Tuple

from rl.function_approx import FunctionApprox
import rl.markov_process as mp
import rl.markov_decision_process as mdp
import rl.iterate as iterate

import unittest

import itertools
import random
from typing import cast, Iterable, Iterator, Optional, Tuple

from rl.distribution import Categorical, Choose
from rl.function_approx import Tabular
import rl.iterate as iterate
from rl.markov_process import FiniteMarkovRewardProcess
import rl.markov_process as mp
from rl.markov_decision_process import FiniteMarkovDecisionProcess
import rl.markov_decision_process as mdp
import rl.td as td

S = TypeVar('S')

A = TypeVar('A')


In [3]:
class FlipFlop(FiniteMarkovRewardProcess[bool]):
    '''A version of FlipFlop implemented with the FiniteMarkovProcess
    machinery.

    '''

    def __init__(self, p: float):
        transition_reward_map = {
            b: Categorical({(not b, 2.0): p, (b, 1.0): 1 - p})
            for b in (True, False)
        }
        super().__init__(transition_reward_map)




test_mdp=FiniteMarkovDecisionProcess({
            True: {
                True: Categorical({(True, 1.0): 0.7, (False, 2.0): 0.3}),
                False: Categorical({(True, 1.0): 0.3, (False, 2.0): 0.7}),
            },
            False: {
                True: Categorical({(False, 1.0): 0.7, (True, 2.0): 0.3}),
                False: Categorical({(False, 1.0): 0.3, (True, 2.0): 0.7}),
            }
        })


# LSTD_with_experience_replay

In [None]:
def lstd(
        transitions: Iterable[mrp.TransitionStep[S,A,R,S]],
        actions: Callable[[S], Iterable[A]],
        batch_size: int,
        S: np.ndarray,
        gradient: Callable,
        replay_memory:np.ndarray,
        feature_transform: Callable[[np.ndarray],np.ndarray],
        γ: float
) -> Iterator[FunctionApprox[Tuple[S, A]]]:
    '''Return policies that try to maximize the reward based on the given
    set of experiences.

    Arguments:
      transitions -- a sequence of state, action, reward, state (S, A, R, S')
      S: state space
      gradient: gradient matrix for feature space 
      batch_size: you know it!
      actions -- a function returning the possible actions for a given state
      feature_transform -- transform S vector to feature matrix
      γ -- discount rate (0 < γ ≤ 1)

    Returns:
      Final weight matrix W

    '''
        W0=feature_transform(S0) # initial W based on first transition step
        A=W0@(W0-gamma*gradient(W0)) # initial A based on first transition step
        b=W0+batch[:,-1] # initial b based on first transition step
        for _ in range(num_iter):
            # append to memory
            next_sample=next(transitions)
            replay_memory=np.append(replay_memory,[next_sample.state,next_sample.next_state,next_sample.reward])
            
            number_of_rows = replay_memory.shape[0]
            if len(number_of_rows)>batch_size:
                random_indices = np.random.choice(number_of_rows, size=batch_size, replace=False)
                batch = replay_memory[random_indices, :]
                W=np.inverse(A)@b
                A=A+feature_transform(batch)@(feature_transform(batch)-gamma*gradient(batch))
                b=b+feature_transform(batch)+batch[:,-1]
        
    return np.inverse(A)@b


# Test LSTD with test_mdp

In [5]:
q_0: Tabular[Tuple[bool, bool]] = Tabular(
            {(s, a): 0.0
             for s in test_mdp.states()
             for a in test_mdp.actions(s)},
            count_to_weight_func=lambda _: 0.1
        )

mrp=FlipFlop(0.7)
start = Tabular(
            {s: 0.0 for s in mrp.states()},
            count_to_weight_func=lambda _: 0.1
        )

episode_length = 20
episodes: Iterable[Iterable[mp.TransitionStep[bool]]] =\
            mrp.reward_traces(Choose({True, False}))

transitions: Iterable[mp.TransitionStep[bool]] =\
            itertools.chain.from_iterable(
                itertools.islice(episode, episode_length)
                for episode in episodes
            )

In [20]:
qs = lstd(
            transitions,
            test_mdp.actions,
            q_0,
            γ=0.99
        )
next(qs)

# LSPI

In [None]:
def lspi(
        transitions: Iterable[mrp.TransitionStep[S,A,R,S]],
        actions: Callable[[S], Iterable[A]],
        batch_size: int,
        S: np.ndarray,
        gradient: Callable,
        replay_memory:np.ndarray,
        feature_transform: Callable[[np.ndarray],np.ndarray],
        γ: float
) -> Iterator[FunctionApprox[Tuple[S, A]]]:
    '''Return policies that try to maximize the reward based on the given
    set of experiences.

    Arguments:
      transitions -- a sequence of state, action, reward, state (S, A, R, S')
      S: state space
      gradient: gradient matrix for feature space 
      batch_size: you know it!
      actions -- a function returning the possible actions for a given state
      feature_transform -- transform S vector to feature matrix
      γ -- discount rate (0 < γ ≤ 1)

    Returns:
      Final weight matrix W

    '''
        W0=feature_transform(S0) # initial W based on first transition step
        A=W0@(W0-gamma*gradient(W0)) # initial A based on first transition step
        b=W0+batch[:,-1] # initial b based on first transition step
        eligi=gradient(W0)@(W@S) # this is the eligibility trace
        for _ in range(num_iter):
            # append to memory
            next_sample=next(transitions)
            replay_memory=np.append(replay_memory,[next_sample.state,next_sample.next_state,next_sample.reward])
            
            number_of_rows = replay_memory.shape[0]
            if len(number_of_rows)>batch_size:
                random_indices = np.random.choice(number_of_rows, size=batch_size, replace=False)
                batch = replay_memory[random_indices, :]
                W=np.inverse(A)@b
                A=A+E@feature_transform(batch)@(feature_transform(batch)-gamma*gradient(batch))
                b=b+feature_transform(batch)+E@batch[:,-1]
        
    return np.inverse(A)@b
