Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Differences in results between dqrng::dqrunif() and dqrng::uniform_distribution dist(min, max); #77

Closed
phgrosjean opened this issue Apr 4, 2024 · 23 comments · Fixed by #82

Comments

@phgrosjean
Copy link

I am building a parallel version of dq<dist>() that uses 10-11 streams. I want to get the same results as a non-parallel implementation, of course also using 10 or 11 streams. In the parallel version I use dqrng::uniform_distribution dist(min, max); and in the non parallel one, dqrng::dqrunif() which is faster, but if I am correct, is not compatible with OpenMP. I don't understand why results slightly differ (and largely differ in the case of normal or exponential distributions)? Here is a reproducible example:

library(dqrng)
library(Rcpp)
cppFunction(r"(
std::vector<double> runif_para(int n, double min = 0, double max = 1, int threads = 2) {
  
  const int nstreams = 10;
  int stream_size = n/nstreams;
  std::vector<double> res(n);
  
  // The generator for the distribution
  dqrng::uniform_distribution dist(min, max);
  
  // We properly seed our RNG (by generating two integers from default RNG)
  Rcpp::IntegerVector _seed(2, dqrng::R_random_int);
  uint64_t seed = dqrng::convert_seed<uint64_t>(_seed);
  dqrng::xoshiro256plusplus rng(seed);
  
  #ifdef _OPENMP
  int maxthreads = omp_get_num_procs();
  if (threads > maxthreads)
    threads = maxthreads;
  // No need for more threads than there are streams
  if (threads > nstreams)
    threads = nstreams;
  #endif
  
  #pragma omp parallel num_threads(threads)
  {
    // Our private RNG in each thread
    dqrng::xoshiro256plusplus prng(rng);
    
    #pragma omp for schedule(static,1)
    for (int i = 0; i < nstreams; ++i) {
      
      // Reseed the RNG and jump it to the correct stream
      prng.seed(seed); 
      prng.long_jump(i);
      
      for (int j = 0; j < stream_size; ++j) {
        res[i * stream_size + j] = dist(prng);
      }
    }
  }
  // Fill remaining items from a supplementary stream
  int remainder = n % nstreams;
  if (remainder > 0) {
    
    // Reseed the original RNG and jump it to an additional stream
    // Not necessary ? rng.seed(seed); 
    rng.long_jump(nstreams);
    
    for (int i = 0; i < remainder; ++i) {
      res[n - remainder + i] = dist(rng);
    }
  }
  return res;
}
)", , depends = "dqrng", includes =
  "// [[Rcpp::depends(dqrng, BH)]]
#include <boost/random/binomial_distribution.hpp>
#include <dqrng.h>
#include <dqrng_distribution.h>
#include <xoshiro.h>
#include <R_randgen.h>
#include <convert_seed.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>")

cppFunction(r"(
Rcpp::NumericVector runif_cpp(int n, double min = 0, double max = 1) {
  const int nstreams = 10;
  int stream_size = n/nstreams;
  Rcpp::NumericVector res(Rcpp::no_init(n));
  
  // We got a seed for our various streams
  Rcpp::IntegerVector seed(2, dqrng::R_random_int);
  
  // Fill the main streams
  for (int i = 0; i < nstreams; ++i) {
    dqrng::dqset_seed(seed, IntegerVector::create(i));
    res[Rcpp::Range(i * stream_size, (i + 1) * stream_size - 1)] = dqrng::dqrunif(stream_size, min, max);
  }
  
  // Fill remaining items from a supplementary stream
  int remainder = n % nstreams;
  if (remainder > 0) {
    dqrng::dqset_seed(seed, IntegerVector::create(nstreams));
    res[Rcpp::Range(n - remainder, n - 1)] = dqrng::dqrunif(remainder, min, max);
  }
  
  return res;
}
)", , depends = "dqrng", includes =
  "// [[Rcpp::depends(dqrng, BH)]]
#include <boost/random/binomial_distribution.hpp>
#include <dqrng.h>
#include <dqrng_distribution.h>
#include <xoshiro.h>
#include <R_randgen.h>
#include <convert_seed.h>")

dqRNGkind("Xoshiro256++")
dqset.seed(42); (res1 <- runif_cpp(22, 0, 1))
dqset.seed(42); (res2 <- runif_para(22, 0, 1, 6))
identical(res1, res2) # FALSE
all.equal(res1, res2) # TRUE (FALSE for normal or exponential distri)
@rstub
Copy link
Member

rstub commented Apr 4, 2024

You are seeing only a small difference? For me the difference is large and differs between runs. A typical example is

> all.equal(res1, res2)
[1] "Mean relative difference: 0.6615271"

Reason for that is that you are fixing the seed with dqset.seed but then use dqrng::R_random_int to seed your actual RNG. This uses R's RNG and is not affected by dqset.seed. So a minimal fix would be to use set.seed instead. For me this gives equal (but not identical) results.

I have to think a bit more if there is a better pattern for this. Please let me know where the documentation could be improved.

@phgrosjean
Copy link
Author

Oh yes, but indeed, I also used dqrng::register_methods(). I tried to simplify the example, but in my case, set.seed() and dqset.seed() target the same RNG. You are right that I should not mix base and {dqrng} RNGs. Yet, I do not understand the results because, even when I use set.seed() instead of dqset.seed(), there are still not the same but very close.

@rstub
Copy link
Member

rstub commented Apr 4, 2024

OK, so these are the differences you are talking about:

> res1-res2
 [1] -1.110223e-16 -1.110223e-16  0.000000e+00 -1.110223e-16  0.000000e+00  0.000000e+00 -1.110223e-16 -1.110223e-16  0.000000e+00  0.000000e+00
[11] -1.110223e-16 -5.551115e-17  0.000000e+00 -2.775558e-17  0.000000e+00 -5.551115e-17 -1.110223e-16 -2.775558e-17 -5.551115e-17  0.000000e+00
[21]  0.000000e+00  0.000000e+00

I am not sure what to make of that. Have you tried using dqrng::uniform_distribution also in the non-parallel case?

@phgrosjean
Copy link
Author

Well, yes, this is the solution, indeed. Yet I don't understand the difference because in both cases the code in {dqrng} use runif_impl internally. Here is the results using the code above plus this one:

cppFunction(r"(
std::vector<double> runif_cpp2(int n, double min = 0, double max = 1) {

  const int nstreams = 10;
  int stream_size = n/nstreams;
  std::vector<double> res(n);
  
  // The generator for the distribution
  dqrng::uniform_distribution dist(min, max);
  
  // We properly seed our RNG (by generating two integers from default RNG)
  Rcpp::IntegerVector _seed(2, dqrng::R_random_int);
  uint64_t seed = dqrng::convert_seed<uint64_t>(_seed);
  dqrng::xoshiro256plusplus rng(seed);
  
  for (int i = 0; i < nstreams; ++i) {
    
    // Reseed the RNG and jump it to the correct stream
    rng.seed(seed); 
    rng.long_jump(i);
    
    for (int j = 0; j < stream_size; ++j) {
      res[i * stream_size + j] = dist(rng);
    }
  }
  // Fill remaining items from a supplementary stream
  int remainder = n % nstreams;
  if (remainder > 0) {
    
    // Reseed the original RNG and jump it to an additional stream
    rng.seed(seed); 
    rng.long_jump(nstreams);
    
    for (int i = 0; i < remainder; ++i) {
      res[n - remainder + i] = dist(rng);
    }
  }
  return res;
}
)", , depends = "dqrng", includes =
  "// [[Rcpp::depends(dqrng, BH)]]
#include <boost/random/binomial_distribution.hpp>
#include <dqrng.h>
#include <dqrng_distribution.h>
#include <xoshiro.h>
#include <R_randgen.h>
#include <convert_seed.h>")

set_seed_(42); (res1 <- runif_cpp(22, 0, 1))
set_seed_(42); (res2 <- runif_para(22, 0, 1, 6))
set_seed_(42); (res3 <- runif_cpp2(22, 0, 1))
identical(res1, res2) # FALSE... OK, but why?
all.equal(res1, res2) # TRUE
identical(res2, res3) # TRUE, YES!
n <- 1e6
bench::mark(runif(n), dqrunif(n), runif_para(n, threads = 6L),
   runif_para(n, threads = 1L), runif_cpp2(n), runif_cpp(n), check = FALSE)
# A tibble: 6 × 13
#    expression                       min   median `itr/sec` mem_alloc `gc/sec` n_itr  n_gc 
#    <bch:expr>                  <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl> <int> <dbl>
#  1 runif(n)                      4.73ms   4.98ms      199.    7.63MB     4.36    91     2
#  2 dqrunif(n)                    3.18ms   3.37ms      297.    7.63MB     6.41   139     3
#  3 runif_para(n, threads = 6L) 901.34µs   1.53ms      672.    7.63MB    12.2    276     5
#  4 runif_para(n, threads = 1L)   3.46ms   3.76ms      264.    7.63MB     4.30   123     2
#  5 runif_cpp2(n)                 3.72ms   3.82ms      259.    7.63MB     6.65   117     3
#  6 runif_cpp(n)                  3.61ms   3.89ms      257.   15.26MB     9.18   112     4

Note: I am looking for the fastest as possible pseudo-random generator for large vectors of 1e6-1e8 items. {dqrng} is fantastic,... I try to push it a little further with parallelization, but still with a non parallel equivalent version.

@rstub
Copy link
Member

rstub commented Apr 4, 2024

Thanks. I have approached it from the other direction, removing as much as possible:

// [[Rcpp::depends(dqrng, BH)]]
#include <Rcpp.h>
#include <dqrng.h>
#include <dqrng_distribution.h>
#include <xoshiro.h>

// [[Rcpp::export]]
Rcpp::NumericVector runif_cpp1(int n, double min = 0.0, double max = 1.0) {
  Rcpp::NumericVector res(Rcpp::no_init(n));

  dqrng::dqset_seed(Rcpp::IntegerVector{42});
  res = dqrng::dqrunif(n, min, max);
  return res;
}

// [[Rcpp::export]]
Rcpp::NumericVector runif_cpp2(int n, double min = 0.0, double max = 1.0) {
  Rcpp::NumericVector res(Rcpp::no_init(n));

  // The generator for the distribution
  dqrng::uniform_distribution dist(min, max);

  dqrng::xoshiro256plusplus rng(42);

  for (int i = 0; i < n; ++i) {
    res[i] = dist(rng);
  }

  return res;
}

/***R
dqrng::dqRNGkind("Xoshiro256++")
res1 <- runif_cpp1(22, 0, 1)
res2 <- runif_cpp2(22, 0, 1)

identical(res1, res2)
all.equal(res1, res2)
res1 - res2
*/

Using Rcpp::sourceCpp on this file gives the same kind of deviations you are seeing but w/o employing multiple threads or seeding from R:

> identical(res1, res2)
[1] FALSE

> all.equal(res1, res2)
[1] TRUE

> res1 - res2
 [1]  0.000000e+00 -1.110223e-16 -1.110223e-16 -1.110223e-16  0.000000e+00 -1.110223e-16 -8.326673e-17
 [8]  0.000000e+00 -5.551115e-17  0.000000e+00  0.000000e+00 -1.110223e-16  0.000000e+00 -8.326673e-17
[15]  0.000000e+00  0.000000e+00 -5.551115e-17 -1.040834e-16 -1.110223e-16 -5.551115e-17 -2.775558e-17
[22] -1.110223e-16

So it is indeed only a difference between dqrng::uniform_distribution dist(min, max) and dqrng::dqrunif(). I will have a closer look at this.

@phgrosjean
Copy link
Author

Right, we get a little bit closer... Note also that differences are larger for, say, dqrng::normal_distribution dist(min, max) and dqrng::dqrnorm().

@rstub
Copy link
Member

rstub commented Apr 4, 2024

I am speculating that changing the internals from boost.random in https://github.com/daqana/dqrng/blob/master/inst/include/dqrng_distribution.h#L41-L75 works only in one of the two cases. Unfortunately I have to focus on some other things right now.

@phgrosjean
Copy link
Author

OK, thanks. If I find something I'll let you know.

@rstub
Copy link
Member

rstub commented Apr 5, 2024

No idea why I missed that yesterday, but sometimes it helps to let things rest a bit. Anyway, the explanation is quite straight forward. The changed internals of boost.random are not global but apply only in certain cases. In particular to RNGs that derive from of type dqrng::random_64bit_generator. In order to get such an RNG, you can use auto rng = dqrng::generator<dqrng::::xoshiro256plusplus>(seed, stream) together with dist(*rng) in your code. Here a worked example w/o parallel code:

// [[Rcpp::depends(dqrng, BH)]]
#include <Rcpp.h>
#include <dqrng.h>
#include <dqrng_distribution.h>
#include <xoshiro.h>

// [[Rcpp::export]]
Rcpp::NumericVector rnorm_cpp1(int n) {
  Rcpp::NumericVector res(Rcpp::no_init(n));
  dqrng::dqset_seed(Rcpp::IntegerVector{42});
  res = dqrng::dqrnorm(n);
  return res;
}

// [[Rcpp::export]]
Rcpp::NumericVector rnorm_cpp2(int n) {
  Rcpp::NumericVector res(Rcpp::no_init(n));
  dqrng::normal_distribution dist;
  dqrng::xoshiro256plusplus rng(42);

  for (int i = 0; i < n; ++i) {
    res[i] = dist(rng);
  }
  return res;
}

// [[Rcpp::export]]
Rcpp::NumericVector rnorm_cpp3(int n) {
  Rcpp::NumericVector res(Rcpp::no_init(n));
  dqrng::normal_distribution dist;
  auto rng = dqrng::generator<dqrng::xoshiro256plusplus>(42);

  for (int i = 0; i < n; ++i) {
    res[i] = dist(*rng);
  }
  return res;
}

/***R
dqrng::dqRNGkind("Xoshiro256++")
res1 <- rnorm_cpp1(22)
res2 <- rnorm_cpp2(22)
res3 <- rnorm_cpp3(22)

identical(res1, res2)
all.equal(res1, res2)
identical(res1, res3)
all.equal(res1, res3)
*/

Result:

[1] FALSE

> all.equal(res1, res2)
[1] "Mean relative difference: 1.140797"

> identical(res1, res3)
[1] TRUE

> all.equal(res1, res3)
[1] TRUE

This works well for you since you have an explicit seed within the function available. In principle I would like to have it such that one can make use of the global RNG even for parallel processing, see the last bullet point in #64.

Please let me know where you would have expected documentation for this behavior.

@rstub
Copy link
Member

rstub commented Apr 15, 2024

After merging #79 we now have proper tools to write such code w/o the need to explicitly seed an RNG. Instead, the global RNG is used as basis for stream-specific cloned RNGs as well as for filling in the remaining data. It is also possible to make this a templated function, making it easy to derive many different explicit implementations:

// [[Rcpp::depends(dqrng, BH, RcppParallel)]]
#include <dqrng.h>
#include <dqrng_distribution.h>
#include <RcppParallel.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>

template<typename Dist, typename... Params>
Rcpp::NumericVector parallel_generate(std::size_t n, int threads, int streams, Params&&... params) {
  int stream_size = n/streams;
  Rcpp::NumericVector res(Rcpp::no_init(n));
  RcppParallel::RVector<double> work(res);

  // use global RNG from dqrng
  dqrng::random_64bit_accessor rng{};

#ifdef _OPENMP
  int maxthreads = omp_get_num_procs();
  if (threads > maxthreads)
    threads = maxthreads;
  // No need for more threads than there are streams
  if (threads > streams)
    threads = streams;
#endif

#pragma omp parallel num_threads(threads)
{

#pragma omp for schedule(static,1)
  for (int i = 0; i < streams; ++i) {
    // Our private RNG in each stream
    auto prng = rng.clone(i+1);
    prng->generate<Dist>(std::begin(work) + i * stream_size,
                         std::begin(work) + (i + 1) * stream_size,
                         std::forward<Params>(params)...);
  }
}
// Fill remaining items from the main stream
  int remainder = n % streams;
  if (remainder > 0) {
    rng.generate<Dist>(std::begin(work) + streams * stream_size,
                       std::end(work),
                       std::forward<Params>(params)...);
  }
  return res;
}

// [[Rcpp::export]]
Rcpp::NumericVector runif_para(int n, double min = 0, double max = 1,
                                int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::uniform_distribution>(n, threads, streams, min, max);
}

// [[Rcpp::export]]
Rcpp::NumericVector rnorm_para(int n, double mean = 0, double sd = 1,
                               int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::normal_distribution>(n, threads, streams, mean, sd);
}

// [[Rcpp::export]]
Rcpp::NumericVector rexp_para(int n, double rate = 1,
                               int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::exponential_distribution>(n, threads, streams, rate);
}


/***R
n <- 1e6
bench::mark(runif(n),
            dqrng::dqrunif(n),
            runif_para(n, threads = 8L),
            runif_para(n, threads = 4L),
            runif_para(n, threads = 2L),
            runif_para(n, threads = 1L),
            check = FALSE)[, 1:6]
bench::mark(rnorm(n),
            dqrng::dqrnorm(n),
            rnorm_para(n, threads = 8L),
            rnorm_para(n, threads = 4L),
            rnorm_para(n, threads = 2L),
            rnorm_para(n, threads = 1L),
            check = FALSE)[, 1:6]
bench::mark(rexp(n),
            dqrng::dqrexp(n),
            rexp_para(n, threads = 8L),
            rexp_para(n, threads = 4L),
            rexp_para(n, threads = 2L),
            rexp_para(n, threads = 1L),
            check = FALSE)[, 1:6]

dqset.seed(42); norm1 <- rnorm_para(22, threads = 1)
dqset.seed(42); norm2 <- rnorm_para(22, threads = 4)
identical(norm1, norm2)
*/

Here RcppParallel is only used for the thread safe accessor templates. I don't see how one could do the fine-grained threads/streams control needed here together with RcppParallel::parallelFor.

I am contemplating to add this as an example to the Parallel RNG usage vignette or as an explicit header that can easily be included to generate functions like this. What do you think? In what form would you liked to be attributed?

@phgrosjean
Copy link
Author

phgrosjean commented Apr 17, 2024

This is very nice! Just it would be better not to depend on both RcppParallel and OpenMP simultaneously. It is possible to get rid of RcppParallel by using std::vector<double> instead of Rcpp::NumericVector, thus without conversion to RcppParallel::Vector<double>. Unfortunately, it is much slower than your solution.

I don't see either how to easily replace the omp for schedule pragma by parallelFor to simplify in the other direction and get rid of OpenMP. So, for now, let's keep it like this.

Anyway, the remainder stuff in my initial code is not very nice. Here is a better solution that redistributes the remainder into the first few iterations so that we always use the same number of streams. Hence, generating n = 7 with streams = 3 would generate 7 values using streams 1,1,1,2,2,3,3 while the initial solution generated values using streams 1,1,2,2,3,3,4.

// [[Rcpp::depends(dqrng, BH, RcppParallel)]]
#include <dqrng.h>
#include <dqrng_distribution.h>
#include <RcppParallel.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>

template<typename Dist, typename... Params>
Rcpp::NumericVector parallel_generate(std::size_t n, int threads, int streams, Params&&... params) {
  int stream_size = n/streams;
  int remainder = n % streams;
  Rcpp::NumericVector res(Rcpp::no_init(n));
  RcppParallel::RVector<double> work(res);
  
  // use global RNG from dqrng
  dqrng::random_64bit_accessor rng{};
  
#ifdef _OPENMP
  int maxthreads = omp_get_num_procs();
  if (threads > maxthreads)
    threads = maxthreads;
  // No need for more threads than there are streams
  if (threads > streams)
    threads = streams;
#endif
  
#pragma omp parallel num_threads(threads)
{
  int start,end;

#pragma omp for schedule(static,1)
  for (int i = 0; i < streams; ++i) {
    if (i <= remainder) {
      start = i * stream_size + i;
      end = start + stream_size + 1;
    } else {
      start = i * stream_size + remainder;
      end = start + stream_size;
    }
    // Our private RNG in each stream
    auto prng = rng.clone(i+1);
    prng->generate<Dist>(std::begin(work) + start, std::begin(work) + end,
      std::forward<Params>(params)...);
  }
}
return res;
}

// [[Rcpp::export]]
Rcpp::NumericVector runif_para(int n, double min = 0, double max = 1,
  int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::uniform_distribution>(n, threads, streams, min, max);
}

// [[Rcpp::export]]
Rcpp::NumericVector rnorm_para(int n, double mean = 0, double sd = 1,
  int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::normal_distribution>(n, threads, streams, mean, sd);
}

// [[Rcpp::export]]
Rcpp::NumericVector rexp_para(int n, double rate = 1,
  int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::exponential_distribution>(n, threads, streams, rate);
}

/***R
n <- 1e6
bench::mark(runif(n),
  dqrng::dqrunif(n),
  runif_para(n, threads = 8L),
  runif_para(n, threads = 4L),
  runif_para(n, threads = 2L),
  runif_para(n, threads = 1L),
  check = FALSE)[, 1:6]
bench::mark(rnorm(n),
  dqrng::dqrnorm(n),
  rnorm_para(n, threads = 8L),
  rnorm_para(n, threads = 4L),
  rnorm_para(n, threads = 2L),
  rnorm_para(n, threads = 1L),
  check = FALSE)[, 1:6]
bench::mark(rexp(n),
  dqrng::dqrexp(n),
  rexp_para(n, threads = 8L),
  rexp_para(n, threads = 4L),
  rexp_para(n, threads = 2L),
  rexp_para(n, threads = 1L),
  check = FALSE)[, 1:6]

dqset.seed(42); norm1 <- rnorm_para(22, threads = 1)
dqset.seed(42); norm2 <- rnorm_para(22, threads = 4)
identical(norm1, norm2)
*/

Please, add it to the vignette or as a header. I am fine with it. Just cite me as a contributor. Do you mind if I implement something on this line in the {svFast} package here: https://github.com/SciViews/svFast ? I'll wait for your changes before doing anything on my side.

@phgrosjean
Copy link
Author

phgrosjean commented Apr 18, 2024

There is still a problem: since we do not consume the default rng, two successive calls to, say norm_para() return the same values. This is not acceptable. Here is a solution:

// [[Rcpp::depends(dqrng, BH, RcppParallel)]]
#include <dqrng.h>
#include <dqrng_distribution.h>
#include <RcppParallel.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>

template<typename Dist, typename... Params>
Rcpp::NumericVector parallel_generate(int n, int threads, int streams, Params&&... params) {
  if (n < streams)
    streams = n;
  int stream_size = n/streams;
  int remainder = n % streams;
  Rcpp::NumericVector res(Rcpp::no_init(n));
  RcppParallel::RVector<double> work(res);
  
  // use global RNG from dqrng
  dqrng::random_64bit_accessor rng{};
  uint64_t newseed;
  
#ifdef _OPENMP
  int maxthreads = omp_get_num_procs();
  if (threads > maxthreads)
    threads = maxthreads;
  // No need for more threads than there are streams
  if (threads > streams)
    threads = streams;
#endif
  
#pragma omp parallel num_threads(threads)
{
  int start,end;

#pragma omp for schedule(static,1)
  for (int i = 0; i < streams; ++i) {
    if (i <= remainder) {
      start = i * stream_size + i;
      end = start + stream_size + 1;
    } else {
      start = i * stream_size + remainder;
      end = start + stream_size;
    }
    // Our private RNG in each stream
    auto prng = rng.clone(i+1);
    prng->generate<Dist>(std::begin(work) + start, std::begin(work) + end,
      std::forward<Params>(params)...);
    if (i == 0) {// Need to advance the default RNG too
      newseed = prng->operator()();
    }
  }
}
// Change the default RNG to avoid generating same sequence next time
// Would be nice to have: rng.seed(newseed);
dqrng::get_rng()->seed(newseed);
return res;
}

// [[Rcpp::export]]
Rcpp::NumericVector runif_para(int n, double min = 0, double max = 1,
  int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::uniform_distribution>(n, threads, streams, min, max);
}

// [[Rcpp::export]]
Rcpp::NumericVector rnorm_para(int n, double mean = 0, double sd = 1,
  int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::normal_distribution>(n, threads, streams, mean, sd);
}

// [[Rcpp::export]]
Rcpp::NumericVector rexp_para(int n, double rate = 1,
  int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::exponential_distribution>(n, threads, streams, rate);
}

/***R
n <- 1e6
bench::mark(runif(n),
  dqrng::dqrunif(n),
  runif_para(n, threads = 8L),
  runif_para(n, threads = 4L),
  runif_para(n, threads = 2L),
  runif_para(n, threads = 1L),
  check = FALSE)[, 1:6]
bench::mark(rnorm(n),
  dqrng::dqrnorm(n),
  rnorm_para(n, threads = 8L),
  rnorm_para(n, threads = 4L),
  rnorm_para(n, threads = 2L),
  rnorm_para(n, threads = 1L),
  check = FALSE)[, 1:6]
bench::mark(rexp(n),
  dqrng::dqrexp(n),
  rexp_para(n, threads = 8L),
  rexp_para(n, threads = 4L),
  rexp_para(n, threads = 2L),
  rexp_para(n, threads = 1L),
  check = FALSE)[, 1:6]

dqset.seed(42); norm1 <- rnorm_para(22, threads = 1)
dqset.seed(42); norm2 <- rnorm_para(22, threads = 4)
identical(norm1, norm2)

dqset.seed(42); norm1 <- rnorm_para(22); norm2 <- rnorm_para(22)
!identical(norm1, norm2)
*/

Not a perfect solution and it supposes that the default RNG is seeded on stream 0, but still better than previous one. Is it another way than calling get_rng() that is discouraged in the C api vignette ? I cannot use dqset_seed() since I have a uint64_t, not a Rcpp::IntegerVector in newseed.

@rstub
Copy link
Member

rstub commented Apr 18, 2024

This is an interesting problem. Thanks for bringing that up! I am not a fan of re-seeding the RNG regularly. So I would prefer, if one of the "local RNGs" would actually be the global one. Maybe I should special case clone(0) to return the global RNG?

@phgrosjean
Copy link
Author

Yes, I was thinking in that direction, but I failed to find a way to do so properly.

@rstub
Copy link
Member

rstub commented Apr 19, 2024

Handing out a unique pointer to the global RNG won't work, since this might get the global RNG deleted. Instead, I have come up with the following:

#include <sstream>
// [[Rcpp::depends(dqrng, BH, RcppParallel)]]
#include <dqrng.h>
#include <dqrng_distribution.h>
#include <RcppParallel/RVector.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>

template<typename Dist, typename... Params>
Rcpp::NumericVector parallel_generate(int n, int threads, int streams, Params&&... params) {
  if (n < streams)
    streams = n;
  int stream_size = n/streams;
  int remainder = n % streams;
  Rcpp::NumericVector res(Rcpp::no_init(n));
  RcppParallel::RVector<double> work(res);
  
  // use global RNG from dqrng
  dqrng::random_64bit_accessor rng{};
  std::stringstream buffer;
  
#ifdef _OPENMP
  int maxthreads = omp_get_num_procs();
  if (threads > maxthreads)
    threads = maxthreads;
  // No need for more threads than there are streams
  if (threads > streams)
    threads = streams;
#endif
  
#pragma omp parallel num_threads(threads)
{
  int start,end;

#pragma omp for schedule(static,1)
  for (int i = 0; i < streams; ++i) {
    if (i < remainder) {
      start = i * stream_size + i;
      end = start + stream_size + 1;
    } else {
      start = i * stream_size + remainder;
      end = start + stream_size;
    }
    // Our private RNG in each stream; RNG with i == 0 is identical to global RNG
    auto prng = rng.clone(i);
    prng->generate<Dist>(std::begin(work) + start, std::begin(work) + end,
      std::forward<Params>(params)...);
    if (i == 0) {// Save the state of the global RNG's clone
      buffer << *prng;
    }
  }
}
  // Make sure that the global RNG advances aas well by applying the state of the global RNG's clone to the global RNG
  buffer >> rng;
  return res;
}

// [[Rcpp::export]]
Rcpp::NumericVector runif_para(int n, double min = 0, double max = 1,
  int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::uniform_distribution>(n, threads, streams, min, max);
}

// [[Rcpp::export]]
Rcpp::NumericVector rnorm_para(int n, double mean = 0, double sd = 1,
  int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::normal_distribution>(n, threads, streams, mean, sd);
}

// [[Rcpp::export]]
Rcpp::NumericVector rexp_para(int n, double rate = 1,
  int threads = 2, int streams = 8) {
  return parallel_generate<dqrng::exponential_distribution>(n, threads, streams, rate);
}

/***R
n <- 1e6
bench::mark(stats::runif(n),
  dqrng::dqrunif(n),
  runif_para(n, threads = 8L),
  runif_para(n, threads = 4L),
  runif_para(n, threads = 2L),
  runif_para(n, threads = 1L),
  check = FALSE)[, 1:6]
bench::mark(stats::rnorm(n),
  dqrng::dqrnorm(n),
  rnorm_para(n, threads = 8L),
  rnorm_para(n, threads = 4L),
  rnorm_para(n, threads = 2L),
  rnorm_para(n, threads = 1L),
  check = FALSE)[, 1:6]
bench::mark(stats::rexp(n),
  dqrng::dqrexp(n),
  rexp_para(n, threads = 8L),
  rexp_para(n, threads = 4L),
  rexp_para(n, threads = 2L),
  rexp_para(n, threads = 1L),
  check = FALSE)[, 1:6]

dqset.seed(42); norm1 <- rnorm_para(22, threads = 1)
dqset.seed(42); norm2 <- rnorm_para(22, threads = 4)
identical(norm1, norm2)

dqset.seed(42); norm1 <- rnorm_para(22); norm2 <- rnorm_para(22)
!identical(norm1, norm2)
*/

Here I use a buffer to save the state of the clone(0) local RNG and apply that to the global RNG. At the moment that won't work for PCG64, since there clone(0) does not produce a clone of the global RNG, but that's something I can change.

BTW, I have also change if (i <= remainder) to if (i < remainder). Otherwise there was an overlapp in the separate ranges. I have also made an explicit include for RVector.h to make it clear, that the rest of RcppParallel is not needed. It would be nice to have RVector.h independent of RcppParallel.

What do you think?

@phgrosjean
Copy link
Author

phgrosjean commented Apr 19, 2024

Yess! This is exactly what I wanted to do. Many, many thanks! I would very much appreciate if PCG64 would also work here because reading the literature, it is an excellent RNG on par or better than Xoshiro256++, even if a little bit slower. This code is already excellent, certainly for a vignette, but I see room for improvement in the following directions for a final implementation:

  1. <dist>_para() could run in sequential mode below a certain size n and activate the parallel mode only for larger vectors with a threshold = argument.

  2. The <dist>_para() functions could use better defaults for streams = and threads =. Increasing streams penalizes a little bit the performances. With streams = 24, performances are still OK and it is fine with 2, 3, 4, 5, 6, 8, 12 and 24 threads that cover a large range of desktop and laptop CPUs. For instance, I have a M3 Max CPU with 12 performance cores, and a Ryzen 9 7950x (16 cores, 32 threads) which produces optimal results using 24 streams / 24 threads. Also for threads, we should calculate a reasonable default. This could be omp_get_num_procs()/2 (same default as {data.table} uses).

@rstub rstub mentioned this issue Apr 19, 2024
1 task
@rstub
Copy link
Member

rstub commented Apr 19, 2024

Please have a look at #82. Does the attribution fit your expectations? BTW, feel free to use this in svFast.

Concerning the additions to <dist>_para: My plan is to have those in the vignette only, since one cannot define functions that ought to be exported by Rcpp in a header file. At least that is my experience so far. And for a vignette I would like to keep these functions simple.

@phgrosjean
Copy link
Author

I understand. Let's keep it simple. We both certainly want to finish and close this issue and the result is very nice, I think. One last point, though (really sorry). Should we keep int n or should we switch to recommended size_t n ? You wrote the later but I changed it to int because there were nasty conversions and overflow checks required between size_t and int while computing streams and threads and in other places (and of course, start and end ought to be size_t too). If you are OK with int for now, we could close this issue and pull request #82.

@rstub
Copy link
Member

rstub commented Apr 20, 2024

Thanks for mentioning the int vs. size_t issue. One reason for doing this in parallel is of course generating lots of random numbers, so long vector support might be useful at some time. I will simply replace all int with size_t to get rid of conversion checks etc., even though it is overkill for threads or streams. The main thing that is missing now is an update to the documentation in parallel.Rmd.

@rstub
Copy link
Member

rstub commented Apr 20, 2024

Any final comments before I merge this?

@phgrosjean
Copy link
Author

This is perfect! Just in the parallel.Rmd vignette, I would add after the last sentence "This way, the global RNG's state advances as if it had processed one of the streams."

This way, the global RNG's state advances as if it had processed one of the streams and successive calls to parallel_generate() produce different random numbers as expected.

dqset.seed(153)
runif_para(30)
runif_para(30) # Different values, as expected

Pitfall: should you use parallel_generate() in concurrent threads or streams, make sure to seed each of them with enough separate streams to avoid overlap. For instance, with parallel_generate(..., stream = 4), you could seed first thread with something like dqset.seed(546, 1), but you must seed second thread at least on stream 5 (previous stream plus the number of streams you reserve for parallel_generate()). If you use dqset.seed(546, 2) on the second thread, there will be an overlap like this:

# Seed used in the first thread
dqset.seed(546, 1); (v1 <- rnorm_para(8, streams = 4))
# Seed used in the second thread
dqset.seed(546, 2); (v2 <- rnorm_para(8, streams = 4))

Note how values 1-6 of v2 are identical to values 3-8 of v1 because of an overlap of the streams consumed in each call to parallel_generate(). You will get the same result if the two lines of code were run in separate threads or even, separate R processes. Here, if second dqset.seed() is shifted by 4 streams or more from first one, then there is no overlap any more:

# Seed used in the first thread
dqset.seed(546, 1); (v1 <- rnorm_para(8, streams = 4))
# Seed used in the second thread
dqset.seed(546, 5); (v2 <- rnorm_para(8, streams = 4))

Of course, it is also advised to use a different seed (first argument to dqset.seed()), or better, to use generateSeedVectors() in this case. The same seed was used here to demonstrate the pitfall.

@rstub
Copy link
Member

rstub commented Apr 21, 2024

Thanks! I have added it to the vignette with the exception of the last sentence. I am not a fan of seeding different processes with different seeds and hoping for the best that there is no overlap when starting from the resulting positions in the RNG's state space. As least as long as we do have control of it with appropriate stream switches.

@phgrosjean
Copy link
Author

Right!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants