In [1]:
import numpy as np

def g(x, gamma):
    return gamma * x

def h(x, tau, epsilon, eta):
    """
    Returns the temporary market impact
    """
    return epsilon * np.sign(x) + eta * x / tau

def expectation_IS(n, X=1000, tau=1, gamma=1, eta=1, eps=1):
    """
    Returns the expected IS
    n, array - Trading trajectory
    X, integer - Number of shares to be liquidated
    """
    res = 0.5*gamma*X**2 + eps*np.sum(np.abs(n)) + (eta - 0.5*gamma*tau)/tau * np.sum(n**2)
    
    return res

def variance_IS(n, X=1000, sigma=0.3, tau=1):
    res = sigma**2 * np.sum((X - n)**2)
    return res

def expectation_TWAP(n,  s0,  X=1000, tau=1, gamma=1, eta=1, eps=1):
    """
    Returns the expected TWAP
    n, array - Trading trajectory
    X, integer - Number of shares to be liquidated
    s0, float - Initial spot price
    gamma, float - Parameter of the permanent market impact
    tau, float - Trading frequency
    eta, float - Parameter of the temporary market impact
    eps, float - Fixed cost of selling in the temporary market impact
    """
    N = len(n)
    T = tau*N
    e = 0
    for k in range(N):
        sumg=0
        for j in range(k):
            sumg+=g(n[j]/tau, gamma)
        e+=(X*tau/T - n[k])*(s0-sumg*tau) - X/T * tau**2*g(n[k]/tau, gamma) + n[k]*h(n[k]/tau, tau, eps, eta)
    return e


def expectation_VWAP(n, s0,v,X=1000, tau=1, gamma=1, eta=1, eps=1):
    """
    Returns the expected VWAP
    n, array - Trading trajectory
    X, integer - Number of shares to be liquidated
    s0, float - Initial spot price
    gamma, float - Parameter of the permanent market impact
    tau, float - Trading frequency
    eta, float - Parameter of the temporary market impact
    eps, float - Fixed cost of selling in the temporary market impact
    v, array - Volume traded during the day
    """
    N = len(n)
    e = 0
    V = v.sum()
    T = tau * N
    for k in range(N):
        gsum = 0
        for j in range(k):
            gsum+=g(n[j]/tau, gamma)
        e+=(X*v[k]/V - n[k])*(s0-gsum*tau) - X/T * tau*v[k]*g(n[k]/tau, gamma) + n[k]*h(n[k]/tau, tau, eps, eta)
    return e

def variance_VWAP(n, s0, v,X=1000, tau=1, gamma=1, eta=1, eps=1):
    """
    Returns the VWAP variance
    n, array - Trading trajectory
    X, integer - Number of shares to be liquidated
    s0, float - Initial spot price
    gamma, float - Parameter of the permanent market impact
    tau, float - Trading frequency
    eta, float - Parameter of the temporary market impact
    eps, float - Fixed cost of selling in the temporary market impact
    v, array - Volume traded during the day
    """
    var = 0
    N = len(n)
    V = v.sum()
    for k in range(N):
        vsum = 0
        for j in range(k,N):
            vsum+=v[j]
        var+=(X/V * vsum-x[k])**2*sigma**2*tau
    return var
        
            
def variance_TWAP(n, s0, X=1000, tau=1, gamma=1, eta=1, eps=1):
    """
    Returns the TWAP variance
    n, array - Trading trajectory
    X, integer - Number of shares to be liquidated
    s0, float - Initial spot price
    gamma, float - Parameter of the permanent market impact
    tau, float - Trading frequency
    eta, float - Parameter of the temporary market impact
    eps, float - Fixed cost of selling in the temporary market impact
    """
    var=0
    N = len(n)
    T = N * tau
    for k in range(N):
        tsum = 0
        for j in range(k,N):
            tsum+=tau
        var+=(X/T * tsum-x[k])**2*sigma**2*tau
    return var

def objective(n,risk_aversion=0.5):
    obj = expectation(n) + risk_aversion * variance(n)
    return obj

def IS(S):
    """
    Returns the Implementation Shortfall benchmark
    """
    return S[0]

def twap(S):
    """
    Returns the TWAP benchmark
    """
    for i in range(len(S)):
        s+=S[i]
    return s/len(S)
    
def vwap(X,S):
    """
    Returns the VWAP benchmark
    """
    s = 0
    v = 0
    for i in range(len(S)):
        v += n[i]
        s+= n[i] * S[i]
    return s/v

def marketimpact(S,X,benchmark):
    """
    Returns the Market Imapct from a benchmark
    """
    s = 0
    for t in range(len(S)):
        s+=S[t]*X[t]
    
    if benchmark.upper() is "IS":
        return X*IS(X,S) - s
    elif benchmark.upper() is "TWAP":
        return X*twap(X,S) - s
    elif benchmark.upper() is "VWAP":
        return X*vwap(X,S) - s
    else:
        raise ValueErrore("Unknown benchmark. Possible benchmarks are: 'IS', 'TWAP', or 'VWAP'.")

In [2]:
def dynamic_programming(nb_T, X_total):
    
    """
    Bellman equation for solving Markov decision processes.
    
    Inputs
    - nb_T, number of time steps
    - X_total, number of shares to be liquidated
    """
    
    # Init
    tau = 1.
    gamma = 2.5 * 10 ** (-5)
    u = np.zeros(shape=(nb_T, X_total+1), dtype="float")  # value function
    b = np.zeros(shape=(nb_T, X_total+1), dtype="int")    # best move
    inventoryforX = np.zeros(shape=(nb_T,1),dtype="int") # evolution of inventory
    inventoryforX[0] = X_total
    
    # Terminal condition
    for x in range(X_total+1):
        u[nb_T - 1, x] = math.exp(gamma*x*h(x/tau))
        b[nb_T - 1, x] = x
    
    # Backwards induction
    for t in range(nb_T-2, -1, -1):
        
        for x in range(X_total+1):
            
            best_value = u[t+1,0] * math.exp(gamma*H(x,x))
            best_n = x
            
            for n in range(x):
                current_value = u[t+1,x-n] * math.exp(gamma*H(x,n)) # we compute the utility function if we sell n shares
                
                if current_value < best_value:
                    best_value = current_value
                    best_n = n #nb of shares to liquidate
               
            u[t,x] = best_value
            b[t,x] = best_n
                    
    for t in range(1,nb_T):
        inventoryforX[t] = inventoryforX[t-1] - b[t,inventoryforX[t-1]]
            
            
    
    return u, b,inventoryforX