Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory limits + enable fine-tuning for parallel execution of small batches #2

Closed
sheffe opened this issue Jan 22, 2020 · 6 comments
Closed

Comments

@sheffe
Copy link
Contributor

sheffe commented Jan 22, 2020

This is one of the most exciting RF papers I've read in a while. I've homebrewed a lot of conditional bias estimators in the past for industry applications, but nothing with a real theoretical foundation, and in early trials this is radically improving on my prior work. Thank you!

A practical issue I'm running into: this method is quite memory-intensive (by necessity) when run for large X.test samples and/or when forest has many unique terminal nodes (e.g. from a high number of X.train samples or many trees or both). Calls to quantForestError that are too large result in Error: vector memory exhausted (limit reached?).

I'm concluding from your paper and some experiments with the software that quantForestError results are deterministic once the input forest is trained, such that it is safe to split large dataframes of X.test into pieces to prevent overloading memory. It also seems to run in linear time with number of samples in X.test. This should allow for convenient parallel execution of smaller calls by batch on X.test. However, ranger::predict by default uses all cores available (and I believe randomForestSRC does the same), which leads to problems in parallel execution of the quantForestError.

A simple solution would be to enable a user input for nCores to tune this setting manually, defaulting to the underlying model class's default behavior. I wanted to raise the issue more generally, though, because the vector memory exhausted errors (or the candidate solution to parcel out the X.test workload) aren't necessarily obvious and might merit some documentation. I'd be happy to work on a vignette or something if useful.

The actual limits will depend on hardware (I'm running this on an iMac with 64GB RAM) but below is a code example. On my machine it doesn't fail until something between 1k and 50k test rows; I didn't get an exact breakdown point. It should be possible to find a heuristic for when X.test needs to be broken up given knowledge of available RAM, but that's hard in the wild.

library(tidyverse)
library(ranger)
library(forestError)
library(tictoc)

train_rows <- 100000
test_rows <- 50000

set.seed(1234)
fake_train <- dplyr::tibble(
  a = rpois(train_rows, 500),
  b = a + rnorm(train_rows, 50, 50),
  c = b + rnorm(train_rows, 50, 50),
  d = c + rnorm(train_rows, 50, 50),
  e = d + rnorm(train_rows, 50, 50),
  f = e + rnorm(train_rows, 50, 50),
  g = f + rnorm(train_rows, 50, 50),
  h = g + rnorm(train_rows, 50, 50),
  i = h + rnorm(train_rows, 50, 50)
)

fake_test <- dplyr::tibble(
  a = rpois(test_rows, 500),
  b = a + rnorm(test_rows, 50, 50),
  c = b + rnorm(test_rows, 50, 50),
  d = c + rnorm(test_rows, 50, 50),
  e = d + rnorm(test_rows, 50, 50),
  f = e + rnorm(test_rows, 50, 50),
  g = f + rnorm(test_rows, 50, 50),
  h = g + rnorm(test_rows, 50, 50),
  i = h + rnorm(test_rows, 50, 50)
)

cor(fake_train)
cor(fake_test)

rf1 <- ranger::ranger(a ~ .,
                      data = fake_train,
                      num.trees = 100,
                      seed = 1234,
                      keep.inbag = TRUE)

tic()
condbias_100rows <- forestError::quantForestError(
  forest = rf1,
  X.train = dplyr::select(fake_train, -a),
  Y.train = fake_train$a,
  X.test = dplyr::select(fake_test, -a)[1:100, ]
)
toc()

tic()
condbias_1000rows <- forestError::quantForestError(
  forest = rf1,
  X.train = dplyr::select(fake_train, -a),
  Y.train = fake_train$a,
  X.test = dplyr::select(fake_test, -a)[1:1000, ]
)
toc()

tic()
condbias_50000rows <- forestError::quantForestError(
  forest = rf1,
  X.train = dplyr::select(fake_train, -a),
  Y.train = fake_train$a,
  X.test = dplyr::select(fake_test, -a)
)
toc()

# Confirming that running chunks of X.test together or in pieces 
# leads to the same result
all.equal(condbias_100rows$estimates,
          condbias_1000rows$estimates[1:100, ])

all.equal(condbias_100rows$qerror,
          condbias_1000rows$qerror)

all.equal(condbias_100rows$perror,
          condbias_1000rows$perror)
@sheffe sheffe changed the title Fine-tuning for parallel execution Memory limits + enable fine-tuning for parallel execution of small batches Jan 22, 2020
@benjilu
Copy link
Owner

benjilu commented Jan 22, 2020

Thanks! I'll look into this and get back to you.

@sheffe
Copy link
Contributor Author

sheffe commented Feb 18, 2020

I think I have some useful interim notes here. What follows here isn't a reprex or clear demo; just some ideas on where further code profiling/iteration in a full reprex might be most helpful. If you think I'm on the right track, I'll dive in further.

I've profiled my original issue on memory failures more carefully using a fork here (https://github.com/sheffe/forestError) that changes very little from current master. It just plugs the nCores from the function interface into ranger's predict to prevent parallel thread collisions when running the method over all cores in batches.

A couple of early conclusions: The results are definitely deterministic, so that splitting batches of new predictions across cores to shrink the total size of the matrices works reasonably well at preventing memory failures. However, it's much slower to run because we make a call to countOOBCohabitantsTrainPar for each test batch, and that's far more impactful to overall time cost than the decision to single-core everything. And even with careful batchwise iteration, I hit out-of-memory errors on problems that I consider moderately sized -- ~2M observations in a training set, 250 trees, and a pretty aggressively increased min.node.size to shrink the number of unique terminal nodes (20, 50, and 100 min.node.size all failed outside of small trial cases).

I'm working on a short-term project where this would be extremely handy, and planning the following experiments. It looks like the calls to countOOBCohabitants /countOOBCohabitantsTrainPar and the resulting bias/MSPE/pred interval calculations can be done in a single step following model training, effectively to create a lookup table between a terminal node ID and its corresponding train OOB bias statistics. This will still have memory failures on large forests/large training sets, but I suspect that iterating tree-wise and then combining the aggregated statistics should work. (As a cheap hack to test this theory, I'm planning to train N00 ranger forests of 1-5 trees each.) Once a one-time lookup table is generated, future calls with test observations can be treated as a subset/join to the appropriate tree/node combination. This can still get quite large -- it's basically creating a graph edgelist representation of the entire RF -- but I suspect it will be easier to predict/reason about the memory constraints of that one-time table of training error statistics, versus when the quantForestError wrapper function is computing them on-the-fly.

@sheffe
Copy link
Contributor Author

sheffe commented Feb 18, 2020

Actually, this turned out to be more straightforward than I thought! The calculation for oob.weights is where the memory failure was occurring: countOOBCohabitants was creating an adjacency matrix (an extremely sparse one) between test node IDs in rows and train node IDs in columns. Allocating that matrix is hugely expensive for larger forests, and what I tried earlier (eg increasing terminal node size) wasn't doing anything because the mapping was test-obs to train-obs.

The solution below gets to the same quantities, but eliminates the adjacency matrix (and the C++ that generates it) in favor of an edgelist representations of train/test that we can join (a table of tree/node/error for train, tree/node/rowid for test). This previews a few extra tricks for speed and memory conservation, too:

  • when we convert to edgelist, we can explicitly drop the tree/node combinations where an observation was inbag and score a major reduction in the number of elements we're retaining (~63.2% right?); in the original code an inbag node ID was being zeroed out but the value 0 for the node ID was still allocated in memory for the large matrix of cohabitation counts.
  • with an edgelist we don't need to retain full information in the train table at observation level, because we join the train edgelist to test observations only on tree/node, so we can pre-average the bias/MSPE and retain a vector of errors (without IDs) for later use in quantile statistics and the PDF/CDF.
  • we could also use data.table by=.EACHI to roll up calculations inside the join, instead of joining edgelists and later summarizing in a second step, but I've just stubbed that out in favor of making the example more explicit.

I haven't written enough around the function to profile them against each other consistently, but you can try (for instance) rbind-ing a thousand noised-up copies of train and it still runs very quickly.

library(magrittr)
library(data.table)
library(randomForest)

data(airquality)

# remove observations with missing predictor variable values
airquality <- airquality[complete.cases(airquality), ]

# get number of observations and the response column index
n <- nrow(airquality)
response.col <- 1

# split data into training and test sets
train.ind <- sample(1:n, n * 0.9, replace = FALSE)
Xtrain <- airquality[train.ind, -response.col]
Ytrain <- airquality[train.ind, response.col]
Xtest <- airquality[-train.ind, -response.col]
Ytest <- airquality[-train.ind, response.col]

rf <- randomForest::randomForest(Xtrain, Ytrain, nodesize = 5,
                                 ntree = 500, keep.inbag = TRUE)


### renaming vars, as if we're inside the quantForestError function
forest <- rf
X.train <- Xtrain
X.test <- Xtest
Y.train <- Ytrain

test.pred.list <- predict(forest, X.test, nodes = TRUE)

# get terminal nodes of all observations
train.terminal.nodes <- attr(predict(forest, X.train, nodes = TRUE), "nodes")
test.terminal.nodes <- attr(test.pred.list, "nodes")

# get number of times each training observation appears in each tree
bag.count <- forest$inbag

# get the OOB prediction error of each training observation
oob.errors <- forest$y - forest$predicted

# get test observation predictions
attributes(test.pred.list) <- NULL
test.preds <- test.pred.list

train.terminal.nodes[bag.count != 0] <- NA

long_test_nodes <-
  as.data.table(test.terminal.nodes)[, rowid_test := .I] %>%
  melt(id.vars = "rowid_test", measure.vars = patterns("\\d+"),
       variable.name = "tree", value.name = "terminal_node")

long_train_nodes <-
  as.data.table(train.terminal.nodes) %>%
  .[, `:=`(rowid_train = .I,
           oob_error = oob.errors)] %>%
  melt(id.vars = c("oob_error", "rowid_train"),
       measure.vars = patterns("\\d+"),
       variable.name = "tree",
       value.name = "terminal_node",
       variable.factor = FALSE,
       na.rm = TRUE) %>%
  .[, .(node_mspe = mean(oob_error^2),
        node_bias = mean(oob_error),
        node_errs = list(oob_error),
        node_obsN = .N),
    by = c("tree", "terminal_node")]

setkey(long_test_nodes, tree, terminal_node)
setkey(long_train_nodes, tree, terminal_node)

oob_error_statistics <- long_train_nodes %>%
  .[long_test_nodes, .(rowid_test, node_mspe, node_bias, node_errs, node_obsN), by = .EACHI] %>%
  .[, .(wtd_mspe = weighted.mean(node_mspe, node_obsN, na.rm = TRUE),
        wtd_bias = weighted.mean(node_bias, node_obsN, na.rm = TRUE),
        all_errs = list(unlist(node_errs))),
    keyby = "rowid_test"]

(^ that last step creating the list-column all_errs is so that we retain the full information for calculating quantile statistics, returning the PDF/CDF, etc. I don't see a way to avoid retaining this if we want full granularity of the error distribution for quantile statistics, but for really large problems I'd lean toward binning observations into ~1000 bins and subsampling the bins down to something reasonable...)

@sheffe
Copy link
Contributor Author

sheffe commented Feb 19, 2020

(apologies for spamming this issue tonight, coding in bursts around a screaming baby...)

I benchmarked this alternate implementation against two dataset sizes (50 and 1000 rbind-ed noised-up copies of airquality, representing 5500 rows and 110,000 rows respectively) and against two rf sizes (100 trees and 1000 trees). I'm single-threading everything in diagnostics below, but (I can file a separate issue if helpful) single-threading quantForestError has been strictly faster than parallelizing in most of the cases I've seen.

The median times in a microbenchmark for the 5,500-row sample:

  • over 100 trees, 521ms for current forestError and 503ms for the data.table example implementation (roughly equal / within noise)
  • over 1000 trees, ~50 seconds for current forestError and ~4.9 seconds for DT example (10x difference)

So, as we'd expect with the large matrix memory allocation we're scaling up at O(n^2) in the original implementation, but roughly linearly when it's an edgelist.

The real memory crunch happens with increasing counts of unique test observations and terminal nodes. For the 110,000-row dataset, I was only able to run the 100-tree forest for both approaches. Median time for forestError was ~630 seconds; the DT example ran in ~21 seconds. Over the 1,000 tree forest, the DT example ran in 220 seconds, so the linear scaling trend continued.

At this point I've gone pretty far afield from your original code / don't want to send you a huge PR haring off in a new direction. Tomorrow I'll polish this idea further and push up to a fork so you can explore at leisure if interested. I think it'll basically require introducing a new dependency to go the edgelist route -- I used data.table here, but I think it's also possible to implement this using a sparse class from the Matrix package. Both are extremely stable upstreams but I know DT better. One benefit of keeping this in data.table/ long dataframe form is that (I'm almost sure) this would make it trivial to convert the forestError approach to run inside a database for huge problems, along the lines of what tidypredict does.

library(magrittr)
library(data.table)
library(randomForest)
library(forestError)

data(airquality)
# remove observations with missing predictor variable values
airquality <- airquality[complete.cases(airquality), ]
# Add some random noise to 1000 copies of the data
set.seed(1234)
airquality_big <- lapply(1:1000, function(x){
  as.data.table(airquality) %>%
    .[, lapply(.SD, function(x) x + rnorm(length(x), 0, 4)),
      .SDcols = names(airquality)]
}) %>%
  data.table::rbindlist() %>%
  as.data.frame()

# get number of observations and the response column index
n <- nrow(airquality_big)
response.col <- 1

# split data into training and test sets
train.ind <- sample(1:n, n * 0.7, replace = FALSE)
Xtrain <- airquality_big[train.ind, -response.col]
Ytrain <- airquality_big[train.ind, response.col]
Xtest <- airquality_big[-train.ind, -response.col]
Ytest <- airquality_big[-train.ind, response.col]

rf1 <- randomForest::randomForest(Xtrain, Ytrain, nodesize = 5,
                                  ntree = 100, keep.inbag = TRUE)
rf2 <- randomForest::randomForest(Xtrain, Ytrain, nodesize = 5,
                                  ntree = 1000, keep.inbag = TRUE)

altimplementation <- function(forest, X.train, X.test, Y.train){

  ### Identical to forestError implementation ###
  test.pred.list <- predict(forest, X.test, nodes = TRUE)
  # get terminal nodes of all observations
  train.terminal.nodes <- attr(predict(forest, X.train, nodes = TRUE), "nodes")
  test.terminal.nodes <- attr(test.pred.list, "nodes")
  # get number of times each training observation appears in each tree
  bag.count <- forest$inbag
  # get the OOB prediction error of each training observation
  oob.errors <- forest$y - forest$predicted
  # get test observation predictions
  attributes(test.pred.list) <- NULL
  test.preds <- test.pred.list
  ### Start new code ###

  # NA-out the inbag set before melting to long
  train.terminal.nodes[bag.count != 0] <- NA

  long_test_nodes <-
    as.data.table(test.terminal.nodes)[, rowid_test := .I] %>%
    melt(id.vars = "rowid_test", measure.vars = patterns("\\d+"),
         variable.name = "tree", value.name = "terminal_node")

  long_train_nodes <-
    as.data.table(train.terminal.nodes) %>%
    .[, `:=`(oob_error = oob.errors)] %>%
    melt(id.vars = c("oob_error"),
         measure.vars = patterns("\\d+"),
         variable.name = "tree",
         value.name = "terminal_node",
         variable.factor = FALSE,
         na.rm = TRUE) %>%
    .[, .(node_mspe = mean(oob_error^2),
          node_bias = mean(oob_error),
          node_errs = list(oob_error),
          node_obsN = .N),
      by = c("tree", "terminal_node")]

  setkey(long_test_nodes, tree, terminal_node)
  setkey(long_train_nodes, tree, terminal_node)

  oob_error_statistics <- long_train_nodes %>%
    .[long_test_nodes, .(rowid_test, node_mspe, node_bias, node_errs, node_obsN), by = .EACHI] %>%
    .[, .(wtd_mspe = weighted.mean(node_mspe, node_obsN, na.rm = TRUE),
          wtd_bias = weighted.mean(node_bias, node_obsN, na.rm = TRUE),
          all_errs = list(unlist(node_errs))
    ),
    keyby = "rowid_test"]

  oob_error_statistics
}

data.table::setDTthreads(threads = 1)
microbenchmark::microbenchmark(
  altimplementation(forest = rf1, X.train = Xtrain, X.test = Xtest, Y.train = Ytrain),
  quantForestError(forest = rf1, X.train = Xtrain, X.test = Xtest, Y.train = Ytrain, what = c("bias", "mspe"), n.cores = 1),
  times = 5
)

# quantForestError breaks here, so commented out
microbenchmark::microbenchmark(
  altimplementation(forest = rf2, X.train = Xtrain, X.test = Xtest, Y.train = Ytrain),
  #quantForestError(forest = rf2, X.train = Xtrain, X.test = Xtest, Y.train = Ytrain, what = c("bias", "mspe"), n.cores = 1),
  times = 5
)

@sheffe
Copy link
Contributor Author

sheffe commented Feb 19, 2020

Here's a working implementation of everything except the PDF/CDF function returns on my fork. I included some tweaks (multiple prediction interval alphas) specific to my own use-cases. As I noted, not planning to send you a huge PR 'til I know what's helpful.

@benjilu
Copy link
Owner

benjilu commented Feb 21, 2020

This looks great! I've reviewed your fork and would be happy to work on merging this with the main branch if you open a pull request and allow me to make commits to it. Of course, you would probably also want to keep a separate copy of your version since it's specifically tailored to your needs.

At a high level, do you see any obvious opportunities for parallelization of the computations in your version? Allowing for parallelization, as well as returning the PDF and CDF functions, are two features I plan to add if possible before merging this version with the main branch.

As for the original quantForestError function, I've also noticed that the parallelization for some reason doesn't yield substantial improvements in the computation time in some cases, but this seems to be a pretty nuanced issue; I haven't been able to really characterize the situations in which parallelization works vs. the situations in which it doesn't, but it seems like a main determinant of whether parallelization improves the runtime or not is actually the number of trees in the forest. I'm looking into it, but the issue may naturally resolve itself or otherwise become moot under your edgelist approach. I can do some testing on it.

Thanks a bunch for these improvements!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants