#  The Deterministic Cake Eating Problem - Policy Function Iteration Solution
### by [Jason DeBacker](http://jasondebacker.com), June 2018
This Jupyter notebook illustrates how to solve the deterministic cake eating problem using Coleman's policy function iteration (PFI) -- and how to speed up this solution using multiprocessing.

## The Problem:

$$V(w)=\max_{c} u(c)+\beta V(w'), \forall w\in[0,\bar{w}]$$

The necessary condition (after applying the envelope theorem) is given by the Euler equation:

$$ u'(c) = \beta u'(c') $$

The unknowns in this system are functions: The value function, $V(w)$, and the policy function, $c = \phi(w)$ (or, equivalently, $w' = p(w)$).

## The Solution:

We'll solve this problem with policy function iteration (PFI).  With this method, we'll use an iterative procedure to converge to a fixed point in function space, the true policy function, $\phi(w)$.  

The computational algorithm for PFI is as follows:

1. Create a discrete grid of the state variable, $w$
2. Make an initial guess at the policy function, $\phi_{0}(w)$
    * This will be a consumption value for each point in the state space
3. Find $K\phi(w)$, where $K\phi(w)$ is the root (zero) of:
$$ u'(K\phi(w)) - \beta u'(\phi_{0}(w - K\phi(w))$$
4. Update the guess at the policy function: $\phi_{1}(w) = K\phi(w)$
5. Repeat this process:
$$\phi_{i+1} \ (w) = K\phi(w), \text{ where } K\phi(w) \text{ solves } u'(K\phi(w)) - \beta u'(\phi_{i}(w - K\phi(w)) = 0$$
6. Stop when, $|\phi_{i+1} \ (w)-\phi_{i}(w)|<\varepsilon$, where $\varepsilon$ is a small number.

## Multiprocessing

Note that in the application of the operator above, a root of the zero function is found for each $w$ *and* the root for any $w_{i}$ is independent of the root for any other $w_{j\neq i}$ .   So we could solve for the root of these equations for several values of $w$ at once by sending the root finding problem out to several CPUs at one time.  

Below, we will illustrate how to do this, and  measure how much speed it buys you, in Python using the [Dask](https://dask.pydata.org/en/latest/) package.

In [1]:
# Import packages
import numpy as np
import matplotlib.pyplot as plt
import scipy.optimize as opt
from scipy import interpolate
import dask
from dask.distributed import Client
import dask.multiprocessing
from timeit import default_timer as timer  # to time computations
import os

# to print plots inline
%matplotlib inline

## Stuff for multiprocessing
# Specify client
client = Client()
num_workers = os.cpu_count()

## Set parameters and create grid for state space

Parameters:
* $\beta$ = rate of time preference
* $\sigma$ = coefficient of relative risk aversion (if $\sigma = 1$, $u(c)=log(c)$), parameter describes the curvature of the utility function

In [2]:
beta = 0.95
sigma = 1.0
R = 1

'''
------------------------------------------------------------------------
Create Grid for State Space    
------------------------------------------------------------------------
lb_w      = scalar, lower bound of cake grid
ub_w      = scalar, upper bound of cake grid 
size_w    = integer, number of grid points in cake state space
w_grid    = vector, size_w x 1 vector of cake grid points 
------------------------------------------------------------------------
'''
lb_w = 0.4 
ub_w = 2.0 
size_w = 100 # Number of grid points
w_grid = np.linspace(lb_w, ub_w, size_w)

## Create functions to use in solution

In [3]:
def utility(w, wprime, sigma, R):
    """
    Per period utility function
    """
    C = w - wprime / R
    try:
        C[C<=0] = 1e-10 # replace 0 and negative consumption with a tiny value - to impose non-negativity on cons
    except TypeError:
        if C <= 0:
            C = 1e-10
    if sigma == 1:
        U = np.log(C)
    else:
        U = (C ** (1 - sigma)) / (1 - sigma)
    
    return U


def u_prime(C, sigma):
    MU = C ** - sigma
    try:
        MU[C<=0] = 999999999 # replace 0 and negative consumption with a tiny value - to impose non-negativity on cons
    except TypeError:
        if C <= 0:
            MU = 999999999
    
    return MU

## Policy Function Iteration


# Serial solution

First, let's solve the problem in a serial manner as we've done before.  This will give us a baseline compute time to compare against.

In [4]:
def coleman_operator(phi, w_grid, params):
    '''
    The Coleman operator, which takes an existing guess phi of the
    optimal consumption policy and computes and returns the updated function
    Kphi on the grid points.
    '''
    beta, sigma = params
    
    # === Apply linear interpolation to phi === #
    phi_func = interpolate.interp1d(w_grid, phi, fill_value='extrapolate')

    # == Initialize Kphi if necessary == #
    Kphi = np.empty_like(phi)

    # == solve for updated consumption value
    for i, w in enumerate(w_grid):
        def h(c):
            return u_prime(c, sigma) - beta * u_prime(phi_func(R * (w - c)), sigma)
        results = opt.root(h, 1e-10)
        c_star = results.x[0]
        Kphi[i] = c_star
        
    return Kphi

In [5]:
'''
------------------------------------------------------------------------
Policy Function Iteration    
------------------------------------------------------------------------
PFtol     = scalar, tolerance required for policy function to converge
PFdist    = scalar, distance between last two policy functions
PFmaxiter = integer, maximum number of iterations for policy function
phi       = vector, policy function for choice of consumption at each iteration
PFstore   = matrix, stores phi at each iteration 
PFiter    = integer, current iteration number
PF_params = tuple, contains parameters to pass into Coleman operator: beta, sigma
new_phi   = vector, updated policy function after applying Coleman operator 
------------------------------------------------------------------------
'''
PFtol = 1e-8 
PFdist = 7.0 
PFmaxiter = 500 
phi = w_grid # initial guess at policy function is to eat all cake
PFstore = np.zeros((size_w, PFmaxiter)) #initialize PFstore array
PFiter = 1 
PF_params = (beta, sigma)
start = timer()
while PFdist > PFtol and PFiter < PFmaxiter:
    PFstore[:, PFiter] = phi
    new_phi = coleman_operator(phi, w_grid, PF_params)
    PFdist = (np.absolute(phi - new_phi)).max()
    phi = new_phi
#     print('Iteration ', PFiter, ' distance = ', PFdist)
    PFiter += 1
end = timer()
if PFiter < PFmaxiter:
    print('Policy function converged after this many iterations:', PFiter)
else:
    print('Policy function did not converge')   
print('Serial compute time is ', end - start)

Policy function converged after this many iterations: 257
Serial compute time is  84.3456991139974


# Parallel solution

Next, let's parallelize this solution by chunking up our grid and applying the Coleman operator to each of those chunks simultaneously.  This is done in the `coleman_dask2()` function.

In [6]:
def coleman_operator_dask(phi, w_grid, params):
    # == Initialize Kphi if necessary == #
    Kphi = np.empty_like(phi)
    
    # == creat function to split up grid == #
    def split(a, n):
        k, m = divmod(len(a), n)
        chunks = list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))
        return chunks
    
     # == solve for updated consumption value == #
    lazy_values = []
    chunks = split(range(size_w), num_workers)
    for chunk in chunks:
        lazy_values.append(dask.delayed(coleman_operator)(phi[list(chunk)],
                                                          w_grid[list(chunk)],
                                                          params))
    results = dask.compute(*lazy_values, get=dask.multiprocessing.get,
                      num_workers=4)
    
    # Unpack results that were computed simultaneously and put in correct places in vector
    for i, result in enumerate(results):
        Kphi[list(chunks[i])] = result

    return Kphi

In [7]:
'''
------------------------------------------------------------------------
Policy Function Iteration  - with multiprocessing 
------------------------------------------------------------------------
PFtol     = scalar, tolerance required for policy function to converge
PFdist    = scalar, distance between last two policy functions
PFmaxiter = integer, maximum number of iterations for policy function
phi       = vector, policy function for choice of consumption at each iteration
PFstore   = matrix, stores phi at each iteration 
PFiter    = integer, current iteration number
PF_params = tuple, contains parameters to pass into Coleman operator: beta, sigma
new_phi   = vector, updated policy function after applying Coleman operator 
------------------------------------------------------------------------
'''
PFtol = 1e-8 
PFdist = 7.0 
PFmaxiter = 500 
phi = w_grid # initial guess at policy function is to eat all cake
PFstore = np.zeros((size_w, PFmaxiter)) #initialize PFstore array
PFiter = 1 
PF_params = (beta, sigma)
start = timer()
while PFdist > PFtol and PFiter < PFmaxiter:
    PFstore[:, PFiter] = phi
    new_phi = coleman_operator_dask(phi, w_grid, PF_params)
    PFdist = (np.absolute(phi - new_phi)).max()
    phi = new_phi
#     print('Iteration ', PFiter, ' distance = ', PFdist)
    PFiter += 1
end = timer()
if PFiter < PFmaxiter:
    print('Policy function converged after this many iterations:', PFiter)
else:
    print('Policy function did not converge')   
print('Parallel compute time is ', end - start)

Policy function converged after this many iterations: 257
Parallel compute time is  51.27704239601735


# Parallel - high overhead example

We could also try to parallize by sending each grid point (rather than chunks of grid points) out to different workers.  This can work in general, but since the compute time for at each grid point is so small, the computational overhead in keeping track of the multiple processes ends up costing too much relative to the ability to apply the Coleman operator simultaneously.  You can see this by executing the following cell.

In [8]:
def coleman_operator_dask_slow(phi, w_grid, params):
    '''
    The Coleman operator, which takes an existing guess phi of the
    optimal consumption policy and computes and returns the updated function
    Kphi on the grid points.
    '''
    beta, sigma = params
    
    # === Apply linear interpolation to phi === #
    phi_func = interpolate.interp1d(w_grid, phi, fill_value='extrapolate')

    # == Initialize Kphi if necessary == #
    Kphi = np.empty_like(phi)

    # == Create function for Euler equation == #
    def euler(c, args):
        (w, phi_func, beta, sigma, R) = args
        return u_prime(c, sigma) - beta * u_prime(phi_func(R * (w - c)), sigma)
        
    # == solve for updated consumption value == #
    lazy_values = []
    for i, w in enumerate(w_grid):
        euler_args = [w, phi_func, beta, sigma, R]
        lazy_values.append(dask.delayed(opt.root)(euler, 1e-10, args=euler_args))
    results = dask.compute(*lazy_values, get=dask.multiprocessing.get,
                      num_workers=4)
 
    # Unpack results that were computed simultaneously and put in correct places in vector
    for i, result in enumerate(results):
        c_star = result.x[0]
        Kphi[i] = c_star
        
    return Kphi

In [9]:
'''
------------------------------------------------------------------------
Policy Function Iteration  - with multiprocessing 
------------------------------------------------------------------------
PFtol     = scalar, tolerance required for policy function to converge
PFdist    = scalar, distance between last two policy functions
PFmaxiter = integer, maximum number of iterations for policy function
phi       = vector, policy function for choice of consumption at each iteration
PFstore   = matrix, stores phi at each iteration 
PFiter    = integer, current iteration number
PF_params = tuple, contains parameters to pass into Coleman operator: beta, sigma
new_phi   = vector, updated policy function after applying Coleman operator 
------------------------------------------------------------------------
'''
PFtol = 1e-8 
PFdist = 7.0 
PFmaxiter = 500 
phi = w_grid # initial guess at policy function is to eat all cake
PFstore = np.zeros((size_w, PFmaxiter)) #initialize PFstore array
PFiter = 1 
PF_params = (beta, sigma)
start = timer()
while PFdist > PFtol and PFiter < PFmaxiter:
    PFstore[:, PFiter] = phi
    new_phi = coleman_operator_dask_slow(phi, w_grid, PF_params)
    PFdist = (np.absolute(phi - new_phi)).max()
    phi = new_phi
#     print('Iteration ', PFiter, ' distance = ', PFdist)
    PFiter += 1
end = timer()
if PFiter < PFmaxiter:
    print('Policy function converged after this many iterations:', PFiter)
else:
    print('Policy function did not converge')  
print('Parallel compute time is ', end - start)

Policy function converged after this many iterations: 257
Parallel compute time is  116.51090636101435
