# Using Parallel Computing for Macroeconomic Forecasting at the Federal Reserve Bank of New York

**Pearl Li** (@pearlzli) <br>
**Federal Reserve Bank of New York** (@FRBNY-DSGE)

June 21, 2017

## Disclaimer

This talk reflects the experience of the author and does not represent an endorsement by the Federal Reserve Bank of New York or the Federal Reserve System of any particular product or service. The views expressed in this talk are those of the authors and do not necessarily reflect the position of the Federal Reserve Bank of New York or the Federal Reserve System. Any errors or omissions are the responsibility of the authors.

## Outline

1. Overview of DSGE modeling
2. "The forecast step": objectives and challenges
3. Parallelizing the forecast code: <br>
   a. DistributedArrays.jl <br>
   b. `pmap` and blocking
4. Conclusion and next steps

## Overview of DSGE modeling

A DSGE (dynamic stochastic general equilibrium) model is a "micro-founded macro-model", used in both policy and academia for

- Forecasting macroeconomic variables (GDP growth, inflation, interest rate, ...)
- Understanding the forces underlying past economic outcomes
- Analyzing the effect of monetary policy
- Investigating counterfactuals or alternative scenarios

Let

- $\theta$ be a vector of parameters (discount rate, capital share, ...)
- $s_t$ be a vector of latent states (output growth, natural rate of interest, ...), including expectations of future states and lags
- $\epsilon_t$ be a vector of exogenous shocks (productivity shock, ...)
- $\eta_t$ be a vector of expectational errors

We express the evolution of the system over time in terms of **equilibrium conditions** (derived from micro theory)

$$\Gamma_0(\theta) s_t = \Gamma_1(\theta) s_{t-1} + \Psi(\theta) \epsilon_t + \Pi(\theta) \eta_t + C(\theta)$$
$$\epsilon_t \sim N(0, Q(\theta))$$

which are solved to give the **transition equation**

$$s_t = T(\theta) s_{t-1} + R(\theta) \epsilon_t + C(\theta)$$

Let $y_t$ be a vector of observed variables (real GDP growth, core PCE inflation, ...). We map latent states $s_t$ to observables $y_t$ using the **measurement equation**

$$y_t = Z(\theta) s_t + D(\theta)$$

DSGE.jl centers around a **model object**

- Each model is a concrete subtype of `AbstractModel`
- Model object stores information about parameters, states, observables, computational settings, and more
- Use **method dispatch** to define model-specific functions:
  + e.g. returning equilibrium condition matrices $\Gamma_0$, $\Gamma_1$, $\Psi$, $\Pi$, and $C$ for a particular model (`eqcond`)
- Model-agnostic methods are defined for `AbstractModels`: e.g. `estimate`, `forecast_one`

We are interested in

- Sampling from the posterior distribution $\mathbb{P}(\theta | y_{1:T})$ of the parameters $\theta$ (**estimation step**)
- Using the estimated parameter draws to forecast, compute impulse responses and shock decompositions, and more (**forecast step**)

## "The forecast step": objectives and challenges

In the estimation step, we generated a large number of parameter draws from their posterior distribution. For each draw $\theta^{(j)}$, we might want to compute the following *products*:

- **Smoothed history:** Estimate smoothed states $s_{t|T}$ (where $T$ is the last data period and $t < T$)
- **Forecast:** Iterate the state space forward to get future states $s_{T+h|T}$
- **Shock decomposition:** Decompose $s_{t|T}$ into a weighted sum of accumulated shocks $\epsilon^{(i)}_{1:t|T}$ (where $i$ indexes the particular shock, e.g. productivity)
- **Impulse response:** Compute $\frac{\partial s_{1:H}}{\partial \epsilon^{(i)}_1}$, the response of states to a shock $\epsilon^{(i)}$ at time 1

Additional objectives

- Repeat for large number of draws (typically 20,000)
- Interested not only in states, but other *classes*:
  + Shocks
  + Observables
  + Other unobserved linear combinations of states ("pseudo-observables")
- With and without conditioning on different nowcasts

Challenges

1. Minimize computational time
   + "Whole shebang" (three conditional types, all products) took ~70 minutes using our MATLAB code
<br>
2. Use reasonable amount of memory
   + (e.g. for computing smoothed historical states) 229 quarters $\times$ 84 states $\times$ 20,000 draws $\times$ 3 conditional types
   + Shock decompositions and impulse responses add another dimension!

In [None]:
for cond_type in [:none, :full, :semi]
    for θ_j in parameter_draws
        # Compute state space matrices under θ_j
        update!(model, θ_j)
        system = compute_system(model)
        
        # Filter and smooth historical states
        kal = filter(model, data, system)
        histstates, histshocks, histpseudo, s_T = 
            smooth(model, data, system, kal)
        
        # Forecast from s_T
        forecaststates, forecastobs, forecastpseudo, forecastshocks = 
            forecast(model, system, s_T)
        
        # Decompose history and forecast into shocks
        shockdecstates, shockdecobs, shockdecpseudo = 
            shock_decompositions(model, system, histshocks)
        
        # Compute impulse response functions
        irfstates, irfobs, irfpseudo = impulse_responses(model, system)
        
        # Write forecast outputs
        write_forecast_outputs(...)
    end
end

## Parallelizing the forecast code

Preview of results: benchmark times against MATLAB (smaller is better)

| Test                                     | MATLAB (2014a) | Julia (0.4.5) |
| ---------------------------------------- | -------------- | ------------- |
| Smoothing                                | 1.00           | 0.38          |
| Forecasting                              | 1.00           | 0.24          |
| Computing shock decompositions           | 1.00           | 0.12          |
| All forecast outputs (modal parameters)  | 1.00           | 0.10          |
| All forecast outputs (full distribution) | 1.00*          | 0.22          |

*Run in MATLAB 2009a

Two approaches considered

1. Distributed storage, i.e. using [DistributedArrays.jl](https://github.com/JuliaParallel/DistributedArrays.jl)
2. `pmap` and "blocking"

DistributedArrays.jl

- Solution for storing arrays too large for one machine
- `DArray` storage distributed across multiple processes
- Each process operates on the part of the array it owns $\implies$ natural parallelization

In [2]:
# Add processes and load package on all processes
worker_procs = addprocs(5)
@everywhere using DistributedArrays

# Initialize DArray, distributing along the first dimension across all 
# 5 processes
arr_size = (25, 2, 2)
arr_div  = [5, 1, 1]
arr = drand(arr_size, worker_procs, arr_div)
fieldnames(arr)



6-element Array{Symbol,1}:
 :identity
 :dims    
 :pids    
 :indexes 
 :cuts    
 :release 

In [3]:
# Query a worker process for its local indices into arr
worker_id = worker_procs[1]
remotecall_fetch(localindexes, worker_id, arr)

(1:5,1:2,1:2)

In [4]:
# Return worker's local array
remotecall_fetch(localpart, worker_id, arr)

5×2×2 Array{Float64,3}:
[:, :, 1] =
 0.481367   0.592673 
 0.532853   0.240059 
 0.0213235  0.0850404
 0.568293   0.015669 
 0.850455   0.0651117

[:, :, 2] =
 0.71181   0.512633 
 0.390442  0.809272 
 0.140688  0.953632 
 0.34935   0.65508  
 0.420469  0.0855251

In [5]:
# Remove worker processes
rmprocs(worker_procs)

:ok

Using `DArrays` in the forecast step

- Distribute parameter draws among worker processes
- Each process will compute all outputs for the draws it owns
- Use both:
  + Lower-level functions (e.g. `smooth`) which operate on one draw
  + Higher-level functions (`smooth_all`) which, given many draws, call lower-level function on each

In [None]:
worker_procs = addprocs(50)

# Load draws and compute systems for each draw
parameter_draws = load_draws(model, worker_procs) # returns a DArray{Float64, 3}
systems = prepare_systems(model, parameter_draws) # returns a DArray{System, 1}

# Filter and smooth historical states
# Both filter_all and smooth_all return DArrays
kals = filter_all(model, data, systems)
histstates, histshocks, histpseudo, s_Ts =
    smooth_all(model, data, systems, kals; procs = worker_procs)

# Write forecast outputs
write_forecast_outputs(...)

...

rmprocs(worker_procs)

Disadvantage #1: draw assignment 

- Must explicitly assign draws to processes
- `DArray`s must be divided equally among processes
- What if number of draws isn't divisible by number of processes? Have to throw out remainder

Disadvantage #2: unwieldy `DArray` construction

- Specify `init` function mapping a tuple of local indices to the local part of the array
- Can only initialize one `DArray` for each call to the `init` function
- But what we want for `smooth_all` is to return four `DArray`s: `histstates`, `histshocks`, `histpseudo`, and `s_Ts`
- Result: ugly code...

In [None]:
# Initialize one big DArray with all outputs
out = DArray((ndraws, nstates + nshocks + npseudo + 1, nperiods), 
             procs, [nprocs, 1, 1]) do I
    
    # Initialize local part of array
    localpart = zeros(map(length, I)...)
    
    # Determine which draws i belong to this process
    draw_inds = first(I)
    ndraws_local = length(draw_inds)

    for i in draw_inds
        # Call smooth on draw i 
        states, shocks, pseudo, s_T = smooth(model, data, systems[i], kals[i])

        # Compute index of draw i into local array
        i_local = mod(i-1, ndraws_local) + 1

        # Assign smooth outputs to local array
        localpart[i_local, states_range,  :] = states
        localpart[i_local, shocks_range,  :] = shocks
        localpart[i_local, pseudo_range,  :] = pseudo
        localpart[i_local, statesT_range, states_range] = s_T
    end
        
    return localpart    
end

In [None]:
# Convert SubArrays to DArrays
states = convert(DArray, out[1:ndraws, states_range, 1:nperiods])
shocks = convert(DArray, out[1:ndraws, shocks_range, 1:nperiods])
pseudo = convert(DArray, out[1:ndraws, pseudo_range, 1:nperiods])
s_Ts = DArray((ndraws,), procs, [nprocs]) do I
    Vector{S}[convert(Array, slice(out, i, statesT_range, states_range)) for i in first(I)]
end

Figure 1: `smooth_all` result before indexing out `SubArray`s

![DArray result](figure1.gif)

- Could instead construct `out` as a `DArray` of `Tuple{Matrix{Float64}, Matrix{Float64}, Matrix{Float64}, Vector{Float64}}` types $\implies$ streamline `init` function
- However, this just postpones unpacking `out`
- Eventually want `histstates` as a 3-dimensional array (draws $\times$ states $\times$ time)

Disadvantage #3: computational time

- Parameter draws live on the processes they've been assigned to; difficult to reallocate
- Sometimes some compute nodes are busier than others
- Bottleneck effect: since `filter_all` must return before `smooth_all` can begin, proceeding is limited by compute time of **slowest process**

`pmap` + blocking

- Divide parameter draws into "blocks" (typically 20,000 draws into 20 blocks)
- Read in one block at a time
- For each block, parallel map `forecast_one_draw` (computes all forecast outputs for a single draw) over that draw's parameters
- When `pmap` returns, write current block's results to disk

In [None]:
# Get indices of draws corresponding to each block
block_indices = forecast_block_inds(model)
nblocks = length(block_indices)

for block = 1:nblocks
    # Load draws for this block
    parameter_draws = load_draws(model, block)

    # Compute forecast outputs for each draw in block
    forecast_outputs = pmap(θ_j -> forecast_one_draw(m, θ_j, data),
                            parameter_draws)

    # Write results for this block
    write_forecast_outputs(model, forecast_outputs, block_number = block)
end

In [None]:
function forecast_one_draw(model::AbstractModel, θ_j::Vector{Float64},
                           data::Matrix{Float64})
    # Compute state space matrices under θ_j
    update!(model, θ_j)
    system = compute_system(model)
    
    # Filter and smooth historical states
    kal = filter(model, data, system)
    histstates, histshocks, histpseudo, s_T = 
        smooth(model, data, system, kal)
        
    # Forecast from s_T
    forecaststates, forecastobs, forecastpseudo, forecastshocks = 
        forecast(model, system, s_T)
        
    # Decompose history and forecast into shocks
    shockdecstates, shockdecobs, shockdecpseudo = 
        shock_decompositions(model, system, histshocks)
      
    # Compute impulse response functions
    irfstates, irfobs, irfpseudo = impulse_responses(model, system)
    
    # Assign results to dictionary to be returned
    forecast_outputs = Dict{Symbol, Array{Float64}}()
    ...
    return forecast_outputs
end

Advantages

- `pmap` handles assigning draws to worker processes automatically (`DArray`s disadvantage #1)
- Don't need to implement functions like `smooth_all` which handle multiple draws (disadvantage #2)
- Takes advantage of independence of draws (disadvantage #3)
- Computing in blocks reduces memory usage when `pmap` returns results to originator process

Why not `@parallel for`? Per [Julia documentation](https://docs.julialang.org/en/stable/manual/parallel-computing/#parallel-map-and-loops):

> Julia’s `pmap()` is designed for the case where each function call does a large amount of work. In contrast, `@parallel for` can handle situations where each iteration is tiny, perhaps merely summing two numbers.

Also, can't assign to a dictionary using `@parallel for` because each process will have a separate copy of it. (Docs recommend using `SharedArray`s to get around this.)

Take advantage of [writing to a subset](https://github.com/JuliaIO/HDF5.jl/blob/master/doc/hdf5.md#reading-and-writing-data) of an HDF5 dataset:

In [None]:
function write_forecast_block(file::JldFile, arr::Array{Float64}, 
                              block_indices::Range{Int})

    # Access JLD file's underlying HDF5 file
    pfile = file.plain
    
    # Get reference to existing HDF5 dataset named "arr"
    dataset = HDF5.d_open(pfile, "arr")
    
    # Write to subset of dataset corresponding to given block indices
    ndims = length(size(dataset))
    dataset[block_indices, fill(Colon(), ndims-1)...] = arr
end

Result: more natural, readable, efficient, and **beautiful** code

## Conclusion and next steps

Challenges for us moving forward

- Make DSGE.jl less us-specific
- Move to Julia 0.6 and 1.0
- Be responsible contributors to the Julia package ecosystem

Ongoing work

- Forecasting under alternative monetary policy rules
- Forecast evaluation and decomposing changes in forecasts
- Estimating nonlinear models using the [tempered particle filter](https://web.sas.upenn.edu/schorf/files/2016/10/HS-TemperedParticleFilter-PaperAppendix-1mlvlvu.pdf) (Herbst & Schorfheide 2017)

Future work

- Forecasting conditional on alternative scenarios
- Plotting

Acknowledgments

- New York Fed DSGE team:
  + Marco Del Negro, Marc Giannoni, Abhi Gupta, Erica Moszkowski, Sara Shahanaghi, Micah Smith

- QuantEcon collaborators:
  + Zac Cranko, Spencer Lyon, John Stachurski, Pablo Winant

## Thank you! Questions?