Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpsharememory() generic and bpmapply() efficiency: Avoid passing a copy of the full data to workers #228

Merged

Conversation

zeehio
Copy link
Contributor

@zeehio zeehio commented Oct 20, 2022

bpmapply was passing another copy of the full data (besides #227). In this case, directly as an explicit extra argument to bplapply.

The arguments to be iterated on are now transposed, so instead of having ddd[[i]][[j]] being the value of the i-th argument in the j-th iteration, we build a list of the form ddd[[j]][[i]]. Then bplapply() can directly iterate on ddd, passing one element of ddd to each worker, instead of passing the whole ddd list of lists and the corresponding index.

This approach has a drawback when the arguments to be iterated on are not lists, but vectors. In that case, the transposition has some overhead, but I would argue that it would make more sense to use something like bpvec or bpmvec if it existed.

UPDATED: This pull request has been updated so its drawbacks do not exist anymore. #228 (comment)

@zeehio
Copy link
Contributor Author

zeehio commented Oct 20, 2022

This approach is better when the workers and the caller do not share memory. However for workers that can share memory (e.g. serial or multicore) we may be paying the overhead of the transposing the list of lists without reaping any benefit, since passing the whole dataset to the worker may not create any additional copies.

I have not checked that, maybe someone else already knows the answer

@zeehio
Copy link
Contributor Author

zeehio commented Nov 2, 2022

In order to transpose the list only when needed I would need to know if the workers of the current backend share memory with the current process or not. Is there any method named "sharedMemory" or similar that returns TRUE for MultiCoreParam and SerialParam but returns FALSE for SnowParam (etc for other backends)?

@DarwinAwardWinner
Copy link

DarwinAwardWinner commented Nov 2, 2022

The only way to share memory would be forking, right? (Well, and single-process)

@zeehio
Copy link
Contributor Author

zeehio commented Nov 2, 2022

I guess, yes.

I'd like to consider backends such as this one (even if it is experimental):

https://github.com/HenrikBengtsson/BiocParallel.FutureParam

Do you have an idea in mind?

@DarwinAwardWinner
Copy link

Hmm, so I think your suggestion of a method might make sense. For most params, this method will return a static boolean value (TRUE for multicore/serial and FALSE for others) but for "meta-backends" like FutureParam and DoparParam, they would need to examine the registered foreach/future backend and return true or false depending on which one is used.

@DarwinAwardWinner
Copy link

For backward compatibility and future-proofing, the abstract base class (BiocParallelParam) should probably provide a default implementation of this method (which always returns FALSE?), which is then overridden by specific backends to "opt in" to the optimization.

@zeehio zeehio force-pushed the further-improve-bpmapply-performance branch from 0ba52e7 to 67ea521 Compare November 3, 2022 08:24
@zeehio
Copy link
Contributor Author

zeehio commented Nov 3, 2022

We now have the best of both worlds:

bpsharememory() generic

This generic tells whether the workers share the same memory space. A serial or multicore based backend would return TRUE, while a multiple process backend (like Snow) would return FALSE.

This generic is now used by bpmapply() to determine whether it is better to pass the whole data to all workers (faster if workers share memory) or to modify the structure of the data to pass a slice of it to each worker (more efficient if workers do not share memory)

bpmapply: Use bpsharememory() to avoid extra copies of all data

bpmapply() receives several arguments to iterate on. This ends up being something like:

ddd <- list(
  arg1 = list(arg1_iter1, arg1_iter2, arg1_iter3),
  arg2 = list(arg2_iter1, arg2_iter2, arg2_iter3)
)

The implementation before this commit was passing ddd to all workers as well as the iteration index i, and each worker would take the corresponding slice ddd$arg1[[i]] and ddd$arg2[[i]]
and pass it to the parallelized function.

For Serial and MultiCore backends, where workers share memory, this is very efficient. However for Snow backends, where workers do not share memory, the whole ddd needs to be serialized, copied to each worker and deserialized, which is very inneficient.

In the Snow scenario, it is far more efficient to let the main process "transpose" the ddd list so it becomes:

ddd2 <- list(
  iter1 = list(arg1_iter1, arg2_iter1),
  iter2 = list(arg1_iter2, arg2_iter2)
)

Then only pass the corresponding iteration to each worker, reducing the amount of serialization/deserialization needed and the memory footprint significantly.

For this to be efficient in all backends, we need to know if the specific backend has workers that share memory or not.

So we use the newly introduced bpsharememory() generic to determine that and choose the most efficient bpmapply() argument slicing strategy.

@zeehio zeehio changed the title bpmapply: Avoid passing (another) copy of the full data to workers bpsharememory() generic and bpmapply() efficiency: Avoid passing a copy of the full data to workers Nov 3, 2022
@mtmorgan
Copy link
Collaborator

mtmorgan commented Nov 3, 2022

@Jiefei-Wang's SharedObject provides object sharing provided one is on a single machine.

I'm not a fan of bpsharedmemory(); if there's a need for bpmapply() to behave differently for different back-ends then dispatch on BPPARAM in

> showMethods(bpmapply)
Function: bpmapply (package BiocParallel)
FUN="ANY", BPPARAM="BiocParallelParam"
FUN="ANY", BPPARAM="list"
FUN="ANY", BPPARAM="missing"

It's actually not correct that MulticoreParam() always shares memory. Consider

x <- 1
param <- bpstart(MulticoreParam())   # forked process, 'x' in shared memory
bplapply(1:4, sqrt, BPPARAM = param) # already forked, so 1:4 must be serialized to worker

Please also follow BiocParallel coding convention (such as it is!) with 4-space indentation. Also I have moved away from aligning arguments with the opening parenthesis of a function name, and instead favor starting (if arguments don't fit on a single line) arguments on a new line indented 4 spaces

result <- someFunction(
    foo,
    bar,
    baz
)

with some flexibility favoring a compact representation

result <- someFunction(foo, bar, baz) # fits on one line
result <- someFunction( #args fit on one line, but not function + arg
    foo, bar, baz
)
result <- someFunction( # some args fit on one
    foo, bar,
    baz
)

I think it's also important to ask whether this is enough of a problem to warrant a solution? Can you provide a simple reproducible example that illustrates?

@DarwinAwardWinner
Copy link

Interesting points about memory sharing, Martin. How do you provide a reproducible example for memory usage? Is there an easy way to get the peak memory usage of the R process and all its children summed?

@mtmorgan
Copy link
Collaborator

mtmorgan commented Nov 3, 2022

Measuring memory is challenging of course, but actually I had been under the impression that the discussion was about speed, so an example would certainly help clarify! Maybe Rcollectl would be helpful...

@zeehio
Copy link
Contributor Author

zeehio commented Nov 4, 2022

Here I just show the problem, which is RAM usage mostly. I'll discuss the solution afterwards

The problem

Here is some code to demo the issue, which affects mostly to RAM usage (and indirectly to CPU, because we have to serialize a lot more data).

To keep things simple I just use a 1GB list with 1000 elements to iterate on. Each element is 1MB long. This was large enough to be noticeable in my system monitor, but small enough to not crash on my 16GB laptop. If you have less RAM available than me, please make the dataset size smaller to avoid RAM issues. I don't measure the RAM in the script, but I attach two screenshots of my system monitor:

# This example shows the RAM issues of bpmapply()

# 1. Create a list of num_samples elements, where each element will be a numeric
#    vector of a given size, such as the whole list targets to take 1GB
#    amount of RAM. I call this list "samples" because it's like a list of 
#    samples to process
#
# On my 16GB of RAM laptop, 1GB are easily noticeable, and I won't suffer
# running out of RAM if I have an extra copy or two (but I will notice the bump)

# Dataset size:
dataset_size_GB <- 1 # GB

num_samples <- 1000

sample_size_MB <- dataset_size_GB/num_samples * 1024 # MB
sample_length <- round(sample_size_MB*1024^2/8) # one double is 8 bytes

samples <- lapply(
  seq_len(num_samples),
  function(i, sample_length) {
    runif(sample_length)
  },
  sample_length = sample_length
)

# 2. Since bpmapply() is designed to take several arguments, we also create
#    another list of the same length as the dataset, with one random number.
#    I see this as another argument I want to iterate on, but it's not relevant

extra_args <- lapply(
  seq_len(num_samples),
  function(i) {
    runif(1)
  }
)

# 3. To show the problem, we can check installing either the master version or
#    this pull request:
#
#  # Pick one:
#  remotes::install_github("Bioconductor/BiocParallel)
#  remotes::install_github("Bioconductor/BiocParallel#228")
#
#  (The BiocParallel from Bioconductor would work as well)

library(BiocParallel)

# I use three workers
bpparam <- SnowParam(workers = 3, exportglobals=FALSE, exportvariables=FALSE)

process_sample <- local({function(sample, extra_arg) {
  force(sample)
  force(extra_arg)
  # just wait a bit, 1000 samples / 3 workers ~ 333 samples/worker * 0.05 s/sample = 16.6 seconds
  Sys.sleep(0.05)
  NULL
}}, envir = baseenv())

bpmapply(
  process_sample,
  sample = samples,
  extra_arg = extra_args,
  BPPARAM = bpparam,
  SIMPLIFY = FALSE
)

When running this on the BiocParallel master branch, I see my RAM usage goes from 15% to 60%. This is roughly 7GB of RAM.

ram_usage_biocmaster

When running this on the branch from this pull request (remotes::install_github("Bioconductor/BiocParallel#228")), we take from 15% up to 25% of the RAM. That's 1.6GB.

ram_usage_bioc228

I may be off in my estimations, since I took them with the naked eye, but it looks pretty clear to me this is worth it.

@zeehio
Copy link
Contributor Author

zeehio commented Nov 4, 2022

There are three points open for discussion:

  • bpsharememory(): As you have shown with the MultiCore example, bpsharememory() is not the right solution for this problem.
  • Coding conventions I missed the style conventions. I will style the code following your practices.
  • bpmapply efficient implementation

bpmapply efficient implementation

We have here two implementations of bpmapply(): one that sends a copy of the whole list of arguments to each worker and another one that transforms that list so it can send to each worker just the stuff it needs.

  • I believe in general it makes more sense for bpmapply() to pass to each worker only the stuff it needs. I suggest this should be the default bpmapply() implementation.
  • I suggest leaving the older implementation "sending a copy of everything" to all workers, only for the bpmapply() implementation for SerialParam()

Using SharedObject

For backends such as Multicore or maybe even Snow (e.g. when all workers are on localhost) it should be possible to share memory between the workers. This should be by far the most efficient solution!

To use SharedObject we should know:

  • Whether the parallellization backend is able to share memory (compatible with SharedObject)
  • Whether the objects we are going to pass to the backend are compatible with SharedObject.

If everything is compatible with SharedObject then we should use it (adding it as a dependency to BiocParallel...). I believe that using SharedObject will require significant testing in many OS and it is beyond this pull request.

Proposal for this pull request

  1. Remove the bpsharememory() generic
  2. Adjust the coding conventions
  3. bpmapply() will use the proposed implementation for all backends except for Serial, where it will never be as efficient as the older one.

What do you think?

@mtmorgan
Copy link
Collaborator

mtmorgan commented Nov 4, 2022

The plan sounds good; thanks for your engagement on this.

One thing might be to reconsider a special case for SerialParam -- there's value in having consistency across back ends, especially when one might switch to SerialParam when trying to debug. Also and perhaps more importantly the re-organization of data structures might not actually be that expensive -- not actually copying the large vector elements, just the S-expressions of the list 'skeleton'. Not sure where you are in your knowledge of R but

> l = list(list(x=1:10, y = 1:10))
> .Internal(inspect(l))
@13c821060 19 VECSXP g0c1 [REF(2)] (len=1, tl=0)
  @15acd1648 19 VECSXP g0c2 [REF(3),ATT] (len=2, tl=0)
    @13c04a810 13 INTSXP g0c0 [REF(65535)]  1 : 10 (compact)
    @13c04a730 13 INTSXP g0c0 [REF(65535)]  1 : 10 (compact)
  ATTRIB:
    @13c04a6c0 02 LISTSXP g0c0 [REF(1)]
      TAG: @15a80d020 01 SYMSXP g0c0 [MARK,REF(6344),LCK,gp=0x4000] "names" (has value)
      @15acd16c8 16 STRSXP g0c2 [REF(65535)] (len=2, tl=0)
	@15b00d6e0 09 CHARSXP g0c1 [MARK,REF(127),gp=0x61] [ASCII] [cached] "x"
	@15b04a3f0 09 CHARSXP g0c1 [MARK,REF(23),gp=0x61] [ASCII] [cached] "y"

shows that the 'data' x is at memory pointed to by 13c04a810 and y is at 13c04a730

Re-arranging,

> m = list(x = l[[1]]$x, y = l[[1]]$y)
> .Internal(inspect(m))
@14a836788 19 VECSXP g0c2 [REF(1),ATT] (len=2, tl=0)
  @13c04a810 13 INTSXP g0c0 [REF(65535)]  1 : 10 (compact)
  @13c04a730 13 INTSXP g0c0 [REF(65535)]  1 : 10 (compact)
ATTRIB:
  @14ab4e438 02 LISTSXP g0c0 [REF(1)]
    TAG: @15a80d020 01 SYMSXP g0c0 [MARK,REF(6345),LCK,gp=0x4000] "names" (has value)
    @14a836808 16 STRSXP g0c2 [REF(1)] (len=2, tl=0)
      @15b00d6e0 09 CHARSXP g0c1 [MARK,REF(129),gp=0x61] [ASCII] [cached] "x"
      @15b04a3f0 09 CHARSXP g0c1 [MARK,REF(25),gp=0x61] [ASCII] [cached] "y"

the list S-expressions have been updated, but the data are being re-used, x is still at 13c04a810 and y at 13c04a730 so there is no real memory cost (the list S-expressions are a fixed size, maybe just 48 bytes? (object.size(list()); I don't think that's exactly right). And the time cost of re-arranging is just an iteration that is likely insignificant in the context of the computation being performed.

@zeehio
Copy link
Contributor Author

zeehio commented Nov 4, 2022

Oh right! I forgot about R copy-on-write magic! I'll keep it simple then!

@Jiefei-Wang
Copy link
Collaborator

Jiefei-Wang commented Nov 4, 2022

I think there are some reasons that I did not touch bpmapply when I was reconstructing these apply functions, but I'm not exactly sure why... Maybe error handling? I need to go over the code again to see if there are any side effect for this change.(or I just simply forgot to change this function?)

@zeehio
Copy link
Contributor Author

zeehio commented Nov 4, 2022

@Jiefei-Wang Sorry, I don't have context of what you were doing. bpmapply() ends up wrapping bplapply() so most of the parallellization stuff happens in bplapply(). Maybe you left it out because of that?

@Jiefei-Wang
Copy link
Collaborator

Your implementation looks good to me, I only have one minor comment. When creating ddd, I think we can use mapply to make the list instead of reinventing our own wheel. For example

foo <- function(...){
    ddd <- mapply(
        function(...)list(...), 
        ...,
        SIMPLIFY = FALSE
    )
    ddd
}
foo(i=1:2, j=4:5)

It will take care of both variable names and unmatched variable lengths.

@zeehio
Copy link
Contributor Author

zeehio commented Nov 5, 2022

I didn't do that simplification because when I tried some unit tests failed.

We could argue if the unit tests should be changed, but I would rather do that on a different issue.

Example of a failing test (from inst/unitTests/test_bpmapply.R):

library(BiocParallel)
library(RUnit)
X <- list(c(a = 1))
checkIdentical(X, bpmapply(identity, X, SIMPLIFY = FALSE))

For your suggestion to work, that test should be changed to:

library(BiocParallel)
library(RUnit)
X <- list(c(a = 1))
checkIdentical(mapply(identity, X, SIMPLIFY = FALSE) , bpmapply(identity, X, SIMPLIFY = FALSE))

Which makes sense to me, to be honest. There are several other tests that would need to be updated as well.

Anyway, this is a subtle difference between mapply and bpmapply that probably should be fixed in BiocParallel. But I'd rather do it on a different issue / pull request.

In case you want to check that out, here is a branch with the simplification you suggested:

remotes::install_github("zeehio/BiocParallel@fix-simplify-bpmapply-argument-preparation")

Or, if you want the code, you can add my remote and check the branch out (in case you are not familiar with git remotes and branches):

# Assuming you are in this git repository...
git remote add zeehio git@github.com:zeehio/BiocParallel.git
git fetch zeehio
git checkout fix-simplify-bpmapply-argument-preparation

If there are no other issues or suggestions I would suggest to merge this pull request and work on that simplification on a new thread. I would rather avoid introducing breaking changes here

@mtmorgan
Copy link
Collaborator

mtmorgan commented Nov 5, 2022

Thanks @zeehio this looks great! Can you 'squash' this into a single commit maybe with

git reset --soft ad15c8f
git commit --edit -m"$(git log --format=%B --reverse HEAD..HEAD@{1})"

(from magic at https://stackoverflow.com/a/5201642/547331) ? That way the history shows only the paths taken rather than the paths not taken... Maybe a commit message like

    improve bpmapply() memory use

    - merges #228
    - bpmapply receives several arguments to iterate on. This ends up being
      something like:

      ddd <- list(
          arg1 = list(arg1_iter1, arg1_iter2, arg1_iter3),
          arg2 = list(arg2_iter1, arg2_iter2, arg2_iter3)
      )

      The implementation before this commit was passing ddd to all
      workers as well as the iteration index, and each worker would
      take the corresponding slice ddd$arg1[[i]] and ddd$arg2[[i]].

      For Serial and (sometimes) Multicore backends, where workers share memory,
      this is very efficient. However for Snow backends, where workers
      do not share memory, the whole ddd needs to be serialized, copied
      to each worker and deserialized, which is very inneficient.

      In the Snow scenario, it is far more efficient to let the main
      process "transpose" the `ddd` list so it becomes:

      ddd2 <- list(
          iter1 = list(arg1_iter1, arg2_iter1),
          iter2 = list(arg1_iter2, arg2_iter2)
      )

      Then only pass the corresponding iteration to each worker,
      reducing the amount of serialization/deserialization needed
      and the memory footprint significantly.

      The re-arrangement is not too expensive, and for consistency is
      applied to all backends.
    - Define helper functions in local() only with the base namespace
    - Simplify bpmapply implementation
    - Remove unused .mrename
    - Update NEWS

I guess there have been changes since this started; I can update before merging, but if you wanted to git rebase master and adjust the version number in the NEWS entry file as appropriate... (or I can do this)

I think the .transposeArgsWithIterations() should be in bpmapply-methods (and corresponding test file).

- merges Bioconductor#228
- bpmapply receives several arguments to iterate on. This ends up being
  something like:

  ddd <- list(
      arg1 = list(arg1_iter1, arg1_iter2, arg1_iter3),
      arg2 = list(arg2_iter1, arg2_iter2, arg2_iter3)
  )

  The implementation before this commit was passing ddd to all
  workers as well as the iteration index, and each worker would
  take the corresponding slice ddd$arg1[[i]] and ddd$arg2[[i]].

  For Serial and (sometimes) Multicore backends, where workers share memory,
  this is very efficient. However for Snow backends, where workers
  do not share memory, the whole ddd needs to be serialized, copied
  to each worker and deserialized, which is very inneficient.

  In the Snow scenario, it is far more efficient to let the main
  process "transpose" the `ddd` list so it becomes:

  ddd2 <- list(
      iter1 = list(arg1_iter1, arg2_iter1),
      iter2 = list(arg1_iter2, arg2_iter2)
  )

  Then only pass the corresponding iteration to each worker,
  reducing the amount of serialization/deserialization needed
  and the memory footprint significantly.

  The re-arrangement is not too expensive, and for consistency is
  applied to all backends.
- Define helper functions in local() only with the base namespace
- Remove unused .mrename
- Update NEWS
@zeehio zeehio force-pushed the further-improve-bpmapply-performance branch from 6659767 to cf5dd73 Compare November 5, 2022 16:21
@zeehio
Copy link
Contributor Author

zeehio commented Nov 5, 2022

I used git rebase -i master and chose to squash all commits. It's all squashed now.

I updated the news entry and moved the transpose function as suggested.

I did this from my phone and I don't have R available here, I can run R CMD check later today to ensure it's all good, or you can beat me to it if you like :)

@zeehio
Copy link
Contributor Author

zeehio commented Nov 5, 2022

All my checks pass. Feel free to merge

mtmorgan added a commit that referenced this pull request Nov 6, 2022
@mtmorgan mtmorgan merged commit cf5dd73 into Bioconductor:master Nov 6, 2022
@mtmorgan
Copy link
Collaborator

mtmorgan commented Nov 6, 2022

Thanks @zeehio that was really helpful; I added you as 'ctb' to the DESCRIPTION file.

@zeehio
Copy link
Contributor Author

zeehio commented Nov 6, 2022

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants