This note replicates the results in Imrohoroğlu, A. (1989). Cost of business cycles with indivisibilities and liquidity constraints. Journal of Political economy, 97(6), 1364-1383.

In [None]:
from numba import njit, prange

## Calibrate Parameters and Discretize State Variables

In [None]:
class BusinessCycleModel:
    def __init__(self,
                 period = 6, # weeks
                 r_save = .00, # net real return on stored assets
                 r_borrow = .08, # rate on borrowing
                 y = 1, # income if employed
                 theta = .25, # income if unemployed
                 beta = .995, # implies an annual time discount rate of 4%
                 sigma = 6.2, # coefficient of risk aversion, later try also 1.5 in Lucas
                 business_cycle=True, 
                 a_max = 8, 
                 a_min = 0,
                 Na = 301,
                ):
        
        # parameters
        self.period, self.beta = period, beta
        self.periods_in_a_year = 52 / period
        self.r_save, self.r_borrow = r_save, r_borrow
        self.y, self.theta = y, theta
        self.sigma = sigma
        self.a_max, self.a_min, self.Na = a_max, a_min, Na
        
        # transition matrices
        if business_cycle:
            self.P = np.array([
                        [0.9141, 0.0234, 0.0587, 0.0038],
                        [0.5625, 0.3750, 0.0269, 0.0356],
                        [0.0608, 0.0016, 0.8813, 0.0563],
                        [0.0375, 0.0250, 0.4031, 0.5344],
                    ])
            self.Ns = 4 # ge, gu, be, bu
            print('Construct model with business cycle.')
        else:
            self.P = np.array([
                        [.9565, .0435],
                        [.5000, .5000]
                    ])
            self.Ns = 2 # e, u
            print('Construct model without business cycle.')
        
        # state variables
        self.s_vals = np.arange(self.Ns)
        self.a_vals = np.linspace(a_min,a_max,self.Na)
        self.y_vals = np.array([y, theta]*int(self.Ns/2))

## Value Function Iteration

In [None]:
def operator_factory(bcm, parallel_flag=True):
    """
    A function factory for building the Bellman operator, as well as
    a function that computes greedy policies.
    """
    
    beta, sigma, r = bcm.beta, bcm.sigma, bcm.r_save
    Na, Ns = bcm.Na, bcm.Ns
    P = bcm.P
    a_vals, s_vals, y_vals = bcm.a_vals, bcm.s_vals, bcm.y_vals
    _u = np.zeros([Na, Ns, Na])
    
    @njit(parallel=parallel_flag)
    def util():
        """
        Indirect utility function
        Calculate at first to use tabulation later
        """
        u = np.empty_like(_u)
        for a0 in prange(Na):
            for s in prange(Ns):
                for a1 in prange(Na):
                    c = a_vals[a0] + y_vals[s] - a_vals[a1]/(1+r)
                    if c < 0:
                        u[a0, s, a1] = -1e6
                    else:
                        u[a0, s, a1] = c**(1-sigma) / (1-sigma)
        return u

    @njit(parallel=parallel_flag)
    def T(v,u):
        """
        The Bellman operator
        Return converged value function and policy function
        """
        v_new = np.zeros_like(v)
        policy = np.zeros_like(v)
        for a0 in prange(Na):
            for s in prange(Ns):
                v_vals = np.zeros(Na)
                for a1 in prange(Na):
                    u0 = u[a0,s,a1]
                    v_vals[a1] = u0 + beta * P[s] @ v[a1,:]
                v_new[a0,s] = np.max(v_vals)
                policy[a0,s] = np.argmax(v_vals)
        return v_new, policy

    return util, T

In [None]:
def solve_model(bcm, 
                use_parallel=True, 
                tol=1e-7, 
                max_iter=5000,
                verbose=True,
                print_skip=50):
    """
    Iterates to convergence on the Bellman equations
    """
    util, T = operator_factory(bcm, use_parallel)
    u = util()
    
    # Set up loop
    i = 0
    error = 1e3
    Na, Ns = bcm.Na, bcm.Ns
    
    # Initialize v
    v = np.zeros([bcm.Na, bcm.Ns])    
        
    while i < max_iter and error > tol:
        v_new, policy = T(v,u)
        error = np.max(np.abs(v - v_new))
        i += 1
        if verbose and i % print_skip == 0:
            print(f"Error at iteration {i} is {error}.")
        v = v_new
        
    if i == max_iter:
        print("Failed to converge!")

    if i < max_iter:
        print(f"Converged in {i} iterations.") 
        policy = policy.astype(int)
    
    return v, policy
    

In [None]:
bc = BusinessCycleModel()

In [None]:
%%time
v, policy = solve_model(bc,)

In [None]:
# plot policy function
plt.plot(bc.a_vals,bc.a_vals[policy[:,0]], color='r')
plt.plot(bc.a_vals,bc.a_vals[policy[:,1]], color='r', linestyle='--')
plt.plot(bc.a_vals,bc.a_vals[policy[:,2]], color='b')
plt.plot(bc.a_vals,bc.a_vals[policy[:,3]], color='b', linestyle='--')
plt.show();

## Invariant Distribution

In [None]:
def solve_invariant_dist(
                bcm, 
                policy,
                parallel_flag=True,
                tol=1e-10, 
                max_iter=2e2,
                verbose=True,
                print_skip=20):
    
    Na, Ns = bcm.Na, bcm.Ns
    P = bcm.P
    
    # Set up loop
    i = 0
    error = 1e3
    
    # Initialize distribution
    pmf = np.ones_like(policy) * (1/(Na*Ns))
    
    # Iteration function
    @njit(parallel=parallel_flag)
    def dist_iter(pmf):
        pmf_new = np.zeros_like(pmf)
        for a0 in prange(Na):
            for s0 in prange(Ns):
                a1 = policy[a0,s0]
                for s1 in prange(Ns):
                    pmf_new[a1,s1] += P[s0,s1] * pmf[a0,s0]
        return pmf_new
        
    while i < max_iter and error > tol:
        pmf_new = dist_iter(pmf)
        error = np.max(np.abs(pmf - pmf_new))
        i += 1
        if verbose and i % print_skip == 0:
            print(f"Error at iteration {i} is {error}.")
            print(f"Pmf sum is {pmf_new.flatten().sum()}.")
        pmf = pmf_new
        
    if i == max_iter:
        print("Failed to converge!")

    if i < max_iter:
        print(f"Converged in {i} iterations.")        
    
    return pmf

In [None]:
pmf = solve_invariant_dist(bc, policy)

In [None]:
# plot distribution
fig, ax = plt.subplots(2, figsize=(6,8))
ax[0].plot(bc.a_vals, pmf[:,0], label='ge')
ax[0].plot(bc.a_vals, pmf[:,1], label='gu')
ax[0].legend()
ax[1].plot(bc.a_vals, pmf[:,2], label='be')
ax[1].plot(bc.a_vals, pmf[:,3], label='bu')
ax[1].legend();