# Assignment 14 : Batch RL

## 1) LSTD algorithm

In [4]:
from typing import *
from rl.markov_process import TransitionStep

A = TypeVar('A')
S = TypeVar('S')
X = TypeVar('X')

def LSTD(feature_functions : Sequence[Callable[[X], float]],
        trace : Iterable[TransitionStep[S]],
        gamma : float) -> np.ndarray:
    """
            LEAST SQUARES TEMPORAL DIFFERENCE
        params:
            -feature_functions : functions mapping the state to different features
            -trace : simulation trace (sequence of steps)
            -gamma : discount factor
        returns:
            weight vector associated with the linear regression mapping feature functions values to predicted V.
    """
    m = len(feature_functions)
    A = np.zeros((m,m))
    b = np.zeros((m,1))
    for i,step in enumerate(trace):
        phi = np.array([f(step.state) for f in feature_functions ]).reshape(-1,1)
        phi_next = np.array([f(step.next_state) for f in feature_functions ]).reshape(-1,1)
        A += np.outer(phi, phi - gamma * phi_next)
        b += phi * step.reward
    return np.linalg.inv(A).dot(b).flatten()



## 2) LSPI : Least Squares Policy Iteration

In [22]:
from rl.markov_decision_process import TransitionStep
Batch = Iterable[TransitionStep[S,A]] # a changer pour inclure action ?

def LSPI(feature_functions : Sequence[Callable[[X], float]],
        batches : Iterable[Batch],
        gamma : float,
        action_space,
        cold_start : Optional[np.ndarray] = None) -> np.ndarray:
    """
            LEAST SQUARES POLICY ITERATION
        params:
            -feature_functions : functions mapping the state to different features
            -batches : batches of experiences
            -gamma : discount factor
            -cold_start : initial weights
        returns:
            weights for the linear mapping from (s,a) pairs to approximate Q-values using feature_functions.
    """

    def linear_Q_approx(W:np.ndarray) -> Callable[[X], float]:
        return lambda s_a : sum([w*f(s_a) for w,f in zip(W,feature_functions)])

    def pi_D(Q_function : Callable[[X], float]) -> Callable[[S], float]:
        return lambda s : action_space[np.argmax([Q_function(s,a) for a in action_space])]

    m = len(feature_functions)

    W = np.random.randn(m,) if cold_start is None else cold_start #initial weights
    
    for batch in batches:
        A = np.zeros((m,m))
        b = np.zeros((m,1))
        Q_function = linear_Q_approx(W)
        pi = pi_D(Q_function)
        for i,step in enumerate(trace):
            phi = np.array([f((step.state, step.action)) for f in feature_functions ]).reshape(-1,1)
            phi_next = np.array([f( (step.next_state, pi(step.next_state)) ) for f in feature_functions]).reshape(-1,1)

            A += np.outer(phi, phi - gamma * phi_next)
            b += phi * step.reward

        W = np.linalg.inv(A).dot(b).flatten()

    return W

## 3) LSPI for American Options Pricing 

In [113]:
Price = float
Continue = True
Exercise = False


@dataclass(frozen=True)
class AmericanState:
    '''American option state as defined in the course
    '''
    time: int
    price_history : Sequence[Price]

    def current_price(self) -> Price:
        return self.price_history[-1]


def american_LSPI(feature_functions : Sequence[Callable[[AmericanState], float]],
        PriceTrajectories : Iterable[Sequence[Price]],
        gamma : float,
        K : float,
        T :int,
        cold_start : Optional[np.ndarray] = None) -> np.ndarray:
    """
            LEAST SQUARES POLICY ITERATION
        params:
            -feature_functions : functions mapping the state to different features
            -PriceTrajectories : Sequence of Price lists of the security of length T
            -gamma : discount factor
            -K : strike price of the option
            -T : maturity date of ther option
            -cold_start : initial weights
        returns:
            weights for the linear mapping from states_action pairs to approximate Q-values for 'continue' action using feature_functions.
    """

    def linear_Q_approx(W:np.ndarray) -> Callable[[X], float]:
        return lambda s_a : sum([w*f(s_a[0]) for w,f in zip(W,feature_functions)]) if s_a[1] else g(s)

    def g(s):
        return max(s.current_price()-K,0)
        
    def pi_D(Q_function : Callable[[X], float]) -> Callable[[S], float]:
        return lambda s : Continue if Q_function(s) > g(s) and s.time < T else Exercise

    m = len(feature_functions)

    W = np.random.randn(m,) if cold_start is None else cold_start #initial weights
    
    for price_traj in PriceTrajectories:
        if len(price_traj) != T+1:
            continue
        state = AmericanState(0,[])
        next_state = AmericanState(1,[price_traj[0]])

        A = np.zeros((m,m))
        b = np.zeros((m,1))
        Q_function = linear_Q_approx(W)
        pi = pi_D(Q_function)
        for i,price in enumerate(price_traj[:T]):
            state = next_state
            next_state = AmericanState(next_state.time + 1,next_state.price_history + [price])

            phi = np.array([f(state, Continue) for f in feature_functions]).reshape(-1,1)
            phi_next = np.array([f( (next_state, pi(next_state)) ) for f in feature_functions]).reshape(-1,1)

            A += np.outer(phi, phi - (Q_function((next_state,Continue)) >= g(next_state)) *gamma * phi_next)
            b += gamma * (Q_function((next_state,Continue)) < g(next_state))  * g(next_state) * phi

        W = np.linalg.inv(A).dot(b).flatten()

    return W



In [114]:
T = 20
K = 2

W = american_LSPI(feature_functions = [lambda s : s.current_price(), lambda s: 1],
        PriceTrajectories  = np.random.randn(10,T) * 2* np.random.rand(10).reshape(-1,1),
        gamma = 0.95,
        K = K ,
        T  = T,
        cold_start  = None)

In [115]:
T = 100
K = 3
Ws = np.array([american_LSPI(feature_functions = [lambda s : s.current_price()/K, lambda s: 1],
        PriceTrajectories  = np.random.randn(1000,T) * 2* np.random.rand(1000).reshape(-1,1),
        gamma = 0.95,
        K = K ,
        T  = T,
        cold_start  = None) for k in range(100)])

print(f'{Ws.mean(0)} +/-  {Ws.std(0)}')


[-0.14971897  0.01542418] +/-  [0.97354472 1.04338781]
