New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelise CsubsetDT #1660

Closed
arunsrinivasan opened this Issue Apr 19, 2016 · 14 comments

Comments

Projects
None yet
7 participants
@arunsrinivasan
Member

arunsrinivasan commented Apr 19, 2016

  • parallelise CsubsetDT
  • resolve thread-safe issues (on SET_STRING_ELT / SET_VECTOR_ELT)
  • add provision for nthreads
  • cleaner implementation (to maintain easily)

@arunsrinivasan arunsrinivasan self-assigned this Apr 19, 2016

@arunsrinivasan arunsrinivasan added this to the v1.9.8 milestone Apr 19, 2016

@arunsrinivasan

This comment has been minimized.

Show comment
Hide comment
@arunsrinivasan

arunsrinivasan Apr 19, 2016

Member

A 4.5GB benchmark comparison. Including edits from Matt.

data

require(data.table)
set.seed(1L)
cols = 4L
rows = 2e8L
x = list(a = sample(letters, rows, TRUE), 
         b = sample(1e5, rows, TRUE),
         c = round(runif(rows),6),
         d = sample(c(TRUE,FALSE), rows, TRUE))
setDT(x)
gc()
print(object.size(x), units="GB")   # 4.5 GB   i.e. small
# generate half of nrow(x) indices
res = rows/2
ix = sample(rows, res, FALSE)
# randomly replace 1000 indices with 0, so that they won't be in the result
bla = sample(length(ix), 1000L)
ix[bla] = 0L
# result should be res-1000 rows

v1.9.6 single threaded

# this is the code that's run when one does x[ix] or x[ix, <cols>].
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
#   user  system elapsed
#  5.828   0.128   5.956
#   user  system elapsed   # Previous result from Arun likely included swapping
# 18.990   0.957  20.007   # Neither of us could reproduce this 20s
nrow(ans) == res-1000L     # [1] TRUE
write.table(ans, "~/tmp1.tsv", row.names=FALSE, quote=FALSE, sep="\t")
#   user  system elapsed 
# 201.156   2.136 203.490

v1.9.7 within-column parallel subsetting ( 8th, 4cpu)

# fresh R session
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
#   user  system elapsed 
# 11.960   1.092   3.970    # 4s here vs 6s above 
nrow(ans) == res-1000L   # [1] TRUE
system.time(fwrite(ans, "~/tmp2.tsv", quote=FALSE, sep="\t"))
#   user  system elapsed                                                                                                              
# 22.112   2.348   4.930

double check the results

system("diff ~/tmp1.tsv ~/tmp2.tsv")  # two 2.1GB files identical

v1.9.7 across-column parallel subsetting ( 8th, 4cpu)

Reimplemented by Matt to solve #1883 hopefully

# fresh R session
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
   user  system elapsed 
  9.232   0.216   3.167
nrow(ans) == res-1000L   # [1] TRUE
fwrite(ans, "~/tmp2.tsv", quote=FALSE, sep="\t")
system("diff ~/tmp1.tsv ~/tmp2.tsv")   # same
dim(ans)
[1] 99999000        4


# without the ix[bla] = 0L is now faster for no 0 no NA case 
res = rows/2
ix = sample(rows, res, FALSE)
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
   user  system elapsed 
  8.288   0.180   2.896
dim(ans)
[1] 100000000         4
Member

arunsrinivasan commented Apr 19, 2016

A 4.5GB benchmark comparison. Including edits from Matt.

data

require(data.table)
set.seed(1L)
cols = 4L
rows = 2e8L
x = list(a = sample(letters, rows, TRUE), 
         b = sample(1e5, rows, TRUE),
         c = round(runif(rows),6),
         d = sample(c(TRUE,FALSE), rows, TRUE))
setDT(x)
gc()
print(object.size(x), units="GB")   # 4.5 GB   i.e. small
# generate half of nrow(x) indices
res = rows/2
ix = sample(rows, res, FALSE)
# randomly replace 1000 indices with 0, so that they won't be in the result
bla = sample(length(ix), 1000L)
ix[bla] = 0L
# result should be res-1000 rows

v1.9.6 single threaded

# this is the code that's run when one does x[ix] or x[ix, <cols>].
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
#   user  system elapsed
#  5.828   0.128   5.956
#   user  system elapsed   # Previous result from Arun likely included swapping
# 18.990   0.957  20.007   # Neither of us could reproduce this 20s
nrow(ans) == res-1000L     # [1] TRUE
write.table(ans, "~/tmp1.tsv", row.names=FALSE, quote=FALSE, sep="\t")
#   user  system elapsed 
# 201.156   2.136 203.490

v1.9.7 within-column parallel subsetting ( 8th, 4cpu)

# fresh R session
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
#   user  system elapsed 
# 11.960   1.092   3.970    # 4s here vs 6s above 
nrow(ans) == res-1000L   # [1] TRUE
system.time(fwrite(ans, "~/tmp2.tsv", quote=FALSE, sep="\t"))
#   user  system elapsed                                                                                                              
# 22.112   2.348   4.930

double check the results

system("diff ~/tmp1.tsv ~/tmp2.tsv")  # two 2.1GB files identical

v1.9.7 across-column parallel subsetting ( 8th, 4cpu)

Reimplemented by Matt to solve #1883 hopefully

# fresh R session
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
   user  system elapsed 
  9.232   0.216   3.167
nrow(ans) == res-1000L   # [1] TRUE
fwrite(ans, "~/tmp2.tsv", quote=FALSE, sep="\t")
system("diff ~/tmp1.tsv ~/tmp2.tsv")   # same
dim(ans)
[1] 99999000        4


# without the ix[bla] = 0L is now faster for no 0 no NA case 
res = rows/2
ix = sample(rows, res, FALSE)
system.time(ans <- .Call("CsubsetDT", x, ix, 1:cols))
   user  system elapsed 
  8.288   0.180   2.896
dim(ans)
[1] 100000000         4
@ChristK

This comment has been minimized.

Show comment
Hide comment
@ChristK

ChristK Apr 20, 2016

@arunsrinivasan This is absolutely great! However, maybe we also need an option to force the use of only one core (or better a specific number of cores). This could be helpful when users have existing code using i.e. doSNOW or doParallel and a parallelised data.table operation would 'fight' with them for the same processing resources.

ChristK commented Apr 20, 2016

@arunsrinivasan This is absolutely great! However, maybe we also need an option to force the use of only one core (or better a specific number of cores). This could be helpful when users have existing code using i.e. doSNOW or doParallel and a parallelised data.table operation would 'fight' with them for the same processing resources.

@arunsrinivasan

This comment has been minimized.

Show comment
Hide comment
@arunsrinivasan

arunsrinivasan Apr 20, 2016

Member

@ChristK yes, that'll be done eventually (before release). Right now, I'm looking at issues related to thread-safety.. will come back to this point. Thanks for pointing out.

Member

arunsrinivasan commented Apr 20, 2016

@ChristK yes, that'll be done eventually (before release). Right now, I'm looking at issues related to thread-safety.. will come back to this point. Thanks for pointing out.

@arunsrinivasan

This comment has been minimized.

Show comment
Hide comment
@arunsrinivasan

arunsrinivasan Apr 24, 2016

Member

setthreads() and getthreads() implemented in 16a22ed

Member

arunsrinivasan commented Apr 24, 2016

setthreads() and getthreads() implemented in 16a22ed

@HenrikBengtsson

This comment has been minimized.

Show comment
Hide comment
@HenrikBengtsson

HenrikBengtsson Apr 25, 2016

Just happens to "drive by": Sorry if I'm missing something already discussed, but don't you think the default should be to use a single thread/core, unless user explicitly specified otherwise? Maybe you could set the default to getOption(" mc.cores", 1L).

I'm worried about cases where user, but also package developer, is not aware that your package is being used. If such a person in turn runs things in parallel and your package by default user more than one thread, then it can quickly blow up. Then, imagine another layer of parallelism, and so on.

Just happens to "drive by": Sorry if I'm missing something already discussed, but don't you think the default should be to use a single thread/core, unless user explicitly specified otherwise? Maybe you could set the default to getOption(" mc.cores", 1L).

I'm worried about cases where user, but also package developer, is not aware that your package is being used. If such a person in turn runs things in parallel and your package by default user more than one thread, then it can quickly blow up. Then, imagine another layer of parallelism, and so on.

@arunsrinivasan

This comment has been minimized.

Show comment
Hide comment
@arunsrinivasan

arunsrinivasan Apr 25, 2016

Member

I see what you mean. Will note it down for internal discussion. Thanks @HenrikBengtsson.

Member

arunsrinivasan commented Apr 25, 2016

I see what you mean. Will note it down for internal discussion. Thanks @HenrikBengtsson.

@HenrikBengtsson

This comment has been minimized.

Show comment
Hide comment
@HenrikBengtsson

HenrikBengtsson Apr 26, 2016

Good to hear. Here's a comment by Roger Koenker where implicit and explicit parallelism competing to use the cores caused a major slow down: https://stat.ethz.ch/pipermail/r-sig-hpc/2014-August/001914.html.

Good to hear. Here's a comment by Roger Koenker where implicit and explicit parallelism competing to use the cores caused a major slow down: https://stat.ethz.ch/pipermail/r-sig-hpc/2014-August/001914.html.

@arunsrinivasan

This comment has been minimized.

Show comment
Hide comment
@arunsrinivasan

arunsrinivasan Apr 26, 2016

Member

Definitely useful. Thanks again.

The current parts that are OpenMP-ised are subset of a data.table and reordering a data.table by reference. These operations wouldn't be coupled usually with other embarrassingly parallel problems normally, I think . However I'm leaning more towards setting the default to 1 (at least for now) as it'd have no consequences for current users using data.table in parallel (explicitly) already.

Member

arunsrinivasan commented Apr 26, 2016

Definitely useful. Thanks again.

The current parts that are OpenMP-ised are subset of a data.table and reordering a data.table by reference. These operations wouldn't be coupled usually with other embarrassingly parallel problems normally, I think . However I'm leaning more towards setting the default to 1 (at least for now) as it'd have no consequences for current users using data.table in parallel (explicitly) already.

@mattdowle

This comment has been minimized.

Show comment
Hide comment
@mattdowle

mattdowle Apr 28, 2016

Member

Most users of data.table will just want to use it, liking that they don't need to write any explicit parallelism and it'll just use their cores for them. They won't need to need to learn or remember to use options or call functions before they benefit from parallelism. What should these users set the number of threads to anyway: 4 or 8 on a 4 core box? It raises more questions for them. Better let OpenMP do its thing and let the OS handle scheduling.

@HenrikBengtsson For advanced users who already use explicit parallelism they need a way to limit to one cpu: yes absolutely. But I don't see why the default should be 1.

Member

mattdowle commented Apr 28, 2016

Most users of data.table will just want to use it, liking that they don't need to write any explicit parallelism and it'll just use their cores for them. They won't need to need to learn or remember to use options or call functions before they benefit from parallelism. What should these users set the number of threads to anyway: 4 or 8 on a 4 core box? It raises more questions for them. Better let OpenMP do its thing and let the OS handle scheduling.

@HenrikBengtsson For advanced users who already use explicit parallelism they need a way to limit to one cpu: yes absolutely. But I don't see why the default should be 1.

@HenrikBengtsson

This comment has been minimized.

Show comment
Hide comment
@HenrikBengtsson

HenrikBengtsson Apr 29, 2016

Agree; what the default should/could be is not obvious. The R world as we know it is currently mostly a single threaded. This is clearly changing and more and more packages will start supporting multi-process/core/thread processing. Scientific software like R is multi-purpose and differs from other type of "regular" software that may multi-process processing by default. For the latter it's easier to control the parallelism, whereas in R it will be a mix of packages that are impossible to predict.

My interest in this is mostly through my work on the future package, where I most recently implemented support for nested futures (=nested parallelism), which user can control, e.g. plan(list(batchjobs, multicore)) or plan(list(multicore, multicore)). I think we'll see more needs for being able to control nestedness and the amount of parallelism and I believe we all would gain if there could be some core R options/functionality for controlling this in a unified fashion. Maybe R could make threads/cores is a limited resource that packages requests and releases helping R not to overload the machine. On the other hand, that might end up reinventing the OS kernels. I really don't know and my OS kernel knowledge is rather old school.

Having said all that, when I suggested getOption(" mc.cores", 1L), my point was less on 1L being the default and more on leveraging the already existing mc.cores option (of the parallel package). Except from option Ncpus, which is used by install.packages(), mc.cores is the only option I'm aware of in R that the user can use to control amount of parallelism. You could also default to getOption(" mc.cores", detectCores()) to maximum CPU utilization. ... or possibly getOption("mc.cores", detectCores()-1L) due to the definition of mc.cores, cf. HenrikBengtsson/Wishlist-for-R#7.

BTW and slightly related to this is the CRAN Repository Policy requiring "Checking the package [...] If running a package uses multiple threads/cores it must never use more than two simultaneously". In other words, you do need an option to control this also because of this.

Great work!

Agree; what the default should/could be is not obvious. The R world as we know it is currently mostly a single threaded. This is clearly changing and more and more packages will start supporting multi-process/core/thread processing. Scientific software like R is multi-purpose and differs from other type of "regular" software that may multi-process processing by default. For the latter it's easier to control the parallelism, whereas in R it will be a mix of packages that are impossible to predict.

My interest in this is mostly through my work on the future package, where I most recently implemented support for nested futures (=nested parallelism), which user can control, e.g. plan(list(batchjobs, multicore)) or plan(list(multicore, multicore)). I think we'll see more needs for being able to control nestedness and the amount of parallelism and I believe we all would gain if there could be some core R options/functionality for controlling this in a unified fashion. Maybe R could make threads/cores is a limited resource that packages requests and releases helping R not to overload the machine. On the other hand, that might end up reinventing the OS kernels. I really don't know and my OS kernel knowledge is rather old school.

Having said all that, when I suggested getOption(" mc.cores", 1L), my point was less on 1L being the default and more on leveraging the already existing mc.cores option (of the parallel package). Except from option Ncpus, which is used by install.packages(), mc.cores is the only option I'm aware of in R that the user can use to control amount of parallelism. You could also default to getOption(" mc.cores", detectCores()) to maximum CPU utilization. ... or possibly getOption("mc.cores", detectCores()-1L) due to the definition of mc.cores, cf. HenrikBengtsson/Wishlist-for-R#7.

BTW and slightly related to this is the CRAN Repository Policy requiring "Checking the package [...] If running a package uses multiple threads/cores it must never use more than two simultaneously". In other words, you do need an option to control this also because of this.

Great work!

arunsrinivasan added a commit that referenced this issue Jul 24, 2016

@arunsrinivasan arunsrinivasan modified the milestones: v2.0.0, v1.9.8 Jul 25, 2016

@renkun-ken

This comment has been minimized.

Show comment
Hide comment
@renkun-ken

renkun-ken Aug 10, 2016

Contributor

Great discussion in this issue! Another small question is when I use OpenMP on small data, the overhead of multi-threading can make it slower than not using it (if I remember correctly). Maybe parallelism should only be enabled for large enough data?

Contributor

renkun-ken commented Aug 10, 2016

Great discussion in this issue! Another small question is when I use OpenMP on small data, the overhead of multi-threading can make it slower than not using it (if I remember correctly). Maybe parallelism should only be enabled for large enough data?

@mattdowle mattdowle modified the milestones: v1.9.8, v2.0.0 Sep 14, 2016

mattdowle added a commit that referenced this issue Sep 15, 2016

Subsetting small data no longer goes parallel (test.data.table 2mins …
…down to 30secs). Changed setthreads to setDTthreads for independent control of data.table. #1660

arunsrinivasan added a commit that referenced this issue Sep 15, 2016

arunsrinivasan added a commit that referenced this issue Sep 15, 2016

@arunsrinivasan

This comment has been minimized.

Show comment
Hide comment
@arunsrinivasan

arunsrinivasan Sep 15, 2016

Member

@renkun-ken with Matt's commit, rows <= 1000 won't be parallelised. This should be updated based on more experiments/tests in the future.

@mattdowle I've now simplified the logic (we discussed sometime around useR'16 that it might be harder to maintain). Logic is heavily commented for now.

Member

arunsrinivasan commented Sep 15, 2016

@renkun-ken with Matt's commit, rows <= 1000 won't be parallelised. This should be updated based on more experiments/tests in the future.

@mattdowle I've now simplified the logic (we discussed sometime around useR'16 that it might be harder to maintain). Logic is heavily commented for now.

@arunsrinivasan

This comment has been minimized.

Show comment
Hide comment
@arunsrinivasan

arunsrinivasan Sep 15, 2016

Member

There's acceptable speedups even on smaller data:

set.seed(1L)
require(data.table)
dt = setDT(lapply(1:10, function(x) sample(5e6)))
print(object.size(dt), units="Mb") # 190.7 Mb

setDTthreads(0L) # uses all threads
system.time(ans1 <- .Call("CsubsetDT", dt, 1:3e6, 1:10))
#    user  system elapsed 
#   0.149   0.071   0.064 
setDTthreads(1L)
system.time(ans2 <- .Call("CsubsetDT", dt, 1:3e6, 1:10))
#    user  system elapsed 
#   0.092   0.027   0.120 
identical(ans1, ans2) # [1] TRUE
setDTthreads(0L) # set it back to use all threads

edit by jangorecki: when calling not exported CsubsetDT function user needs to be aware of #1762.

Member

arunsrinivasan commented Sep 15, 2016

There's acceptable speedups even on smaller data:

set.seed(1L)
require(data.table)
dt = setDT(lapply(1:10, function(x) sample(5e6)))
print(object.size(dt), units="Mb") # 190.7 Mb

setDTthreads(0L) # uses all threads
system.time(ans1 <- .Call("CsubsetDT", dt, 1:3e6, 1:10))
#    user  system elapsed 
#   0.149   0.071   0.064 
setDTthreads(1L)
system.time(ans2 <- .Call("CsubsetDT", dt, 1:3e6, 1:10))
#    user  system elapsed 
#   0.092   0.027   0.120 
identical(ans1, ans2) # [1] TRUE
setDTthreads(0L) # set it back to use all threads

edit by jangorecki: when calling not exported CsubsetDT function user needs to be aware of #1762.

@mattdowle mattdowle reopened this Oct 19, 2016

mattdowle added a commit that referenced this issue Nov 19, 2016

Reverted within-column parallel subsetting, #1883. Cache and thread s…
…ynchronization bites within column. Code complexity (150 lines removed) a secondary issue. One-thread-per-column should work better (one or two column subsets are fast anyway). #1660

mattdowle added a commit that referenced this issue Nov 21, 2016

@mattdowle mattdowle closed this in df4bf33 Nov 22, 2016

@vikram-rawat

This comment has been minimized.

Show comment
Hide comment
@vikram-rawat

vikram-rawat Jun 23, 2017

People Like me who do not understand programming COME to R. Please Don't implement single core as a default settings. Let it be at it's fastest possible speed without giving any further arguements... Please

People Like me who do not understand programming COME to R. Please Don't implement single core as a default settings. Let it be at it's fastest possible speed without giving any further arguements... Please

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment