This note replicates the results in Imrohoroğlu, A. (1989). Cost of business cycles with indivisibilities and liquidity constraints. Journal of Political economy, 97(6), 1364-1383.

In [None]:
from numba import njit, prange, float64, int64
from numba.experimental import jitclass
from interpolation import interp
from quantecon.optimize import brent_max, brentq
from quantecon import MarkovChain

## Calibrate Parameters and Discretize State Variables

In [None]:
class BusinessCycleModel:
    def __init__(self,
                 period = 6, # weeks
                 r_save = .00, # net real return on stored assets
                 r_borrow = .08, # rate on borrowing
                 y = 1, # income if employed
                 theta = .25, # income ratio if unemployed
                 beta = .995, # implies an annual time discount rate of 4%
                 sigma = 6.2, # coefficient of risk aversion, later try also 1.5 in Lucas
                 business_cycle=True, 
                 a_max = 8, 
                 a_min = 0,
                 Na = 301,
                ):
        
        # parameters
        self.period, self.beta = period, beta
        self.periods_in_a_year = 52 / period
        self.r_save, self.r_borrow = r_save, r_borrow
        self.R = 1 + r_save
        self.sigma = sigma
        self.Na = Na
        
        # transition matrices
        if business_cycle:
            self.P = np.array([
                        [0.9141, 0.0234, 0.0587, 0.0038],
                        [0.5625, 0.3750, 0.0269, 0.0356],
                        [0.0608, 0.0016, 0.8813, 0.0563],
                        [0.0375, 0.0250, 0.4031, 0.5344],
                    ])
            self.Ns = 4 
            self.Ns_label = ['ge', 'gu', 'be', 'bu']
            print('Construct model with business cycle.')
        else:
            self.P = np.array([
                        [.9565, .0435],
                        [.5000, .5000]
                    ])
            self.Ns = 2 
            self.Ns_label = ['e', 'u']
            print('Construct model without business cycle.')
        
        # state variables
        self.a_vals = np.linspace(a_min,a_max,self.Na)
        self.y_vals = np.array([y, y*theta]*int(self.Ns/2))
                

## Value Function Iteration

In [None]:
def operator_factory_value_iteration(model, parallel_flag=True):
    """
    A function factory that output utility function and value fuction iterator
    """
    
    beta, sigma, r = model.beta, model.sigma, model.r_save
    Na, Ns = model.Na, model.Ns
    a_vals, y_vals = model.a_vals, model.y_vals
    P = model.P
    _u = np.empty([Na, Ns, Na])

    @njit(parallel=parallel_flag)
    def util():
        """
        Indirect utility function
        Calculate at first for tabulation use later
        """
        c = np.empty_like(_u)
        u = np.empty_like(_u)
        for a0 in prange(Na):
            for s0 in prange(Ns):
                for a1 in prange(Na):
                    c0 = a_vals[a0] + y_vals[s0] - a_vals[a1]/(1+r)
                    c[a0, s0, a1] = c0
                    if c0 <= 0:
                        u[a0, s0, a1] = -1e8
                    else:
                        u[a0, s0, a1] = c0**(1-sigma) / (1-sigma)
        return c, u

    @njit(parallel=parallel_flag)
    def T(v,u):
        """
        The Bellman operator
        Return new value function and the policy function
        """

        v_new = np.empty_like(v)
        policy = np.empty_like(v)
        σ = np.empty_like(v)
        for a0 in prange(Na):
            for s0 in prange(Ns):
                v_vals = np.zeros(Na)
                for a1 in prange(Na):
                    u0 = u[a0,s0,a1]
                    v_vals[a1] = u0 + beta * P[s0] @ v[a1,:]
                v_new[a0,s0] = np.max(v_vals)
                a1 = np.argmax(v_vals)
                policy[a0,s0] = a1
                σ[a0,s0] = a_vals[a0] + y_vals[s0] - a_vals[a1]/(1+r)
                
        return v_new, policy, σ

    return util, T

In [None]:
def solve_model_value_iteration(
                model, 
                use_parallel = True,
                tol=1e-4, 
                max_iter=5000,
                verbose=True,
                print_skip=50):
    """
    Iterates to convergence on the Bellman equations
    """
    
    print('Solve model with value function iteration.')
    
    util, T = operator_factory_value_iteration(model, use_parallel)
    _, u = util() 
    
    # Set up loop
    i = 0
    error = tol + 1
    
    # Initialize v
    v = np.zeros([model.Na, model.Ns])    
        
    while i < max_iter and error > tol:
        v_new, policy, σ = T(v,u)
        error = np.max(np.abs(v - v_new))
        i += 1
        if verbose and i % print_skip == 0:
            print(f"Error at iteration {i} is {error}.")
        v = v_new
        
    if i == max_iter:
        print("Failed to converge!")

    if i < max_iter:
        print(f"Converged in {i} iterations.") 
        policy = policy.astype(int)
    
    return v, policy, σ
    

In [None]:
bc = BusinessCycleModel()

In [None]:
%%time
v, policy, σ_star = solve_model_value_iteration(bc,)

In [None]:
# plot policy function
fig, ax = plt.subplots()
for s in range(bc.Ns):
    ax.plot(bc.a_vals,bc.a_vals[policy[:,s]],  label=bc.Ns_label[s])
ax.plot(bc.a_vals,bc.a_vals, color='black', linestyle='--')
ax.set(xlabel='$a_t$', ylabel='$a_{t+1}$')
ax.legend()
plt.show();

In [None]:
# plot consuming policy function
fig, ax = plt.subplots()
for s in range(bc.Ns):
    label = rf'$\sigma^*(\cdot, {s}) - {bc.Ns_label[s]}$'
    ax.plot(bc.a_vals, σ_star[:, s], label=label)
ax.set(xlabel='$a_t$', ylabel='$c_t$')
ax.legend()
plt.show();

## Policy Function Iteration

- $V(a, s)=\max \left\{U(c)+\beta \sum_{s^{\prime}} \mathbf{\Pi}\left(s, s^{\prime}\right) V\left(a^{\prime}, s^{\prime}\right)\right\}$ 

In [None]:
def operator_factory_time_iteration(model):
    """
    A function factory that output utility function and value fuction iterator
    """
    
    beta, sigma, R = model.beta, model.sigma, model.R
    Na, Ns = model.Na, model.Ns
    a_vals, y_vals = model.a_vals, model.y_vals
    P = model.P    
    
    @njit
    def u_prime(c):
        return c**(-sigma)

    @njit
    def euler_diff(c, a, s, σ_vals):
        """
        The difference between the left- and right-hand side
        of the Euler Equation, given current policy σ.

            * c is the consumption choice
            * (a, s) is the state, with s in {0,1,...,Ns}
            * σ_vals is a policy represented as a matrix.
        """

        # Convert policy into a function by linear interpolation
        def σ(a, s):
            return interp(a_vals, σ_vals[:, s], a)

        # Calculate the expectation conditional on current z
        expect = 0.0
        for s1 in range(Ns):
            expect += u_prime(σ(R * (a - c + y_vals[s]), s1)) * P[s, s1]

        return u_prime(c) - max(beta * R * expect, u_prime(a+y_vals[s]))
    
    @njit
    def K(σ):
        """
        The operator K.
        """
        σ_new = np.empty_like(σ)
        for i, a in enumerate(a_vals):
            for s in range(Ns):
                result = brentq(euler_diff, 1e-8, a+y_vals[s], args=(a, s, σ))
                σ_new[i, s] = result.root

        return σ_new
    
    return euler_diff, K

In [None]:
def solve_model_time_iter(model,    # Class with model information
                          tol=1e-4,
                          max_iter=1000,
                          verbose=True,
                          print_skip=25):
    
    print('Solve model with policy function iteration.')
    
    euler_diff, K = operator_factory_time_iteration(model)
    
    # Set up initial consumption policy
    σ = np.repeat(model.a_vals.reshape(model.Na, 1), model.Ns, axis=1)

    # Set up loop
    i = 0
    error = tol + 1

    while i < max_iter and error > tol:
        σ_new = K(σ)
        error = np.max(np.abs(σ - σ_new))
        i += 1
        if verbose and i % print_skip == 0:
            print(f"Error at iteration {i} is {error}.")
        σ = σ_new

    if i == max_iter:
        print("Failed to converge!")

    if verbose:
        print(f"\nConverged in {i} iterations.")

    return σ_new

In [None]:
%%time
bc = BusinessCycleModel()
σ_star = solve_model_time_iter(bc)

In [None]:
fig, ax = plt.subplots()
for s in range(bc.Ns):
    label = rf'$\sigma^*(\cdot, {s}) - {bc.Ns_label[s]}$'
    ax.plot(bc.a_vals, σ_star[:, s], label=label)
ax.set(xlabel='$a_t$', ylabel='$c_t$')
ax.legend()
plt.show();

In [None]:
# plot asset policy function
fig, ax = plt.subplots()
for s in range(bc.Ns):
    a1 = bc.R * (bc.a_vals - σ_star[:, s] + bc.y_vals[s]) 
    ax.plot(bc.a_vals, a1, label=bc.Ns_label[s])
ax.plot(bc.a_vals,bc.a_vals, color='black', linestyle='--')
ax.set(xlabel='$a_t$', ylabel='$a_{t+1}$')
ax.legend()
plt.show();

## Stationary Distribution - Montel Carlo Simulation

In [None]:
def compute_asset_series(model, σ_star, T=500_000, seed=1234):
    """
    Simulates a time series of length T for assets, given optimal
    savings behavior.
    """
    P, a_vals, y_vals, R  = model.P, model.a_vals, model.y_vals, model.R
     
    σ = lambda a, s: interp(a_vals, σ_star[:, s], a)

    # Simulate the exogeneous state process
    mc = MarkovChain(P)
    s_seq = mc.simulate(T, random_state=seed)

    # Simulate the asset path
    a_path = np.zeros(T+1)
    for t in range(T):
        s = s_seq[t]
        a_path[t+1] = R * (a_path[t] - σ(a_path[t], s) + y_vals[s])
    return a_path

In [None]:
bc = BusinessCycleModel()
σ_star = solve_model_time_iter(bc, verbose=False)
a_path = compute_asset_series(bc, σ_star)

fig, ax = plt.subplots()
ax.hist(a_path, bins=20, alpha=0.5, density=True)
ax.set(xlabel='assets')
plt.show();

## Stationary Distribution - 

In [None]:
def solve_invariant_dist(
                bcm, 
                policy,
                parallel_flag=True,
                tol=1e-5, 
                max_iter=1000,
                verbose=True,
                print_skip=50):
    
    Na, Ns = bcm.Na, bcm.Ns
    P = bcm.P
    
    # Set up loop
    i = 0
    error = 1e3
    
    # Initialize distribution
    pmf = np.ones_like(policy) * (1/(Na*Ns))
    
    # Iteration function
    @njit(parallel=parallel_flag)
    def dist_iter(pmf):
        pmf_new = np.zeros_like(pmf)
        for a0 in prange(Na):
            for s0 in prange(Ns):
                a1 = policy[a0,s0]
                for s1 in prange(Ns):
                    pmf_new[a1,s1] += P[s0,s1] * pmf[a0,s0]
        return pmf_new
        
    while i < max_iter and error > tol:
        pmf_new = dist_iter(pmf)
        error = np.max(np.abs(pmf - pmf_new))
        i += 1
        if verbose and i % print_skip == 0:
            print(f"Error at iteration {i} is {error}.")
            print(f"Pmf sum is {pmf_new.flatten().sum()}.")
        pmf = pmf_new
        
    if i == max_iter:
        print("Failed to converge!")

    if i < max_iter:
        print(f"Converged in {i} iterations.")        
    
    return pmf

In [None]:
bc = BusinessCycleModel()
_, policy, _ = solve_model_value_iteration(bc,)
pmf = solve_invariant_dist(bc, policy)

In [None]:
# plot distribution
fig, ax = plt.subplots(2, figsize=(6,8))
ax[0].plot(bc.a_vals, pmf[:,0], label='ge')
ax[0].plot(bc.a_vals, pmf[:,1], label='gu')
ax[0].legend()
ax[1].plot(bc.a_vals, pmf[:,2], label='be')
ax[1].plot(bc.a_vals, pmf[:,3], label='bu')
ax[1].legend();