Skip to content

Commit

Permalink
version 1.1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo authored and cran-robot committed Jul 2, 2024
1 parent 25305d2 commit 23dcbff
Show file tree
Hide file tree
Showing 40 changed files with 719 additions and 807 deletions.
12 changes: 6 additions & 6 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.1.0
Description: High performance parallel code execution and distributed computing.
Version: 1.1.1
Description: High-performance parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Modern networking and concurrency built on 'nanonext' and 'NNG' (Nanomsg
Expand All @@ -22,15 +22,15 @@ URL: https://shikokuchuo.net/mirai/,
https://github.com/shikokuchuo/mirai/
Encoding: UTF-8
Depends: R (>= 3.6)
Imports: nanonext (>= 1.1.0)
Imports: nanonext (>= 1.1.1)
Enhances: parallel, promises
Suggests: knitr, markdown
VignetteBuilder: knitr
RoxygenNote: 7.3.1
RoxygenNote: 7.3.2
NeedsCompilation: no
Packaged: 2024-06-06 06:10:47 UTC; cg334
Packaged: 2024-07-01 09:39:28 UTC; cg334
Author: Charlie Gao [aut, cre] (<https://orcid.org/0000-0002-0750-061X>),
Hibiki AI Limited [cph]
Maintainer: Charlie Gao <charlie.gao@shikokuchuo.net>
Repository: CRAN
Date/Publication: 2024-06-06 06:30:02 UTC
Date/Publication: 2024-07-01 10:30:07 UTC
78 changes: 39 additions & 39 deletions MD5
Original file line number Diff line number Diff line change
@@ -1,73 +1,73 @@
a98c75230cdb9eeb7098a30e4c1b820e *DESCRIPTION
7ee1fde5b28ccbd9a8995fe33463b917 *DESCRIPTION
44f551d453e1e042d845d491dacace3e *NAMESPACE
77cd69324d2bc3961f0a8f3d24a3d5be *NEWS.md
51d927e2136327537b0ccbabb7fbd5fd *R/daemon.R
b5c97a12bdca10da0e331d2027ed373a *R/daemons.R
de07a189bec33765b7c0a0923c27db71 *R/dispatcher.R
3465450aa738b9cbb90d7cf4daec1bc9 *R/launchers.R
60808023e102a39ce5bc2feec1816ffd *R/map.R
42b3f045d2e5855569d2338a6b879f91 *R/mirai-package.R
7205ffd56558dfa398e3a206040ead46 *R/mirai.R
dea69f6878b67c4e08e384dec7f7a503 *NEWS.md
664f7658ec03b124fb05a90320c08f34 *R/daemon.R
2d03bb055135e5b591fb806401824f7a *R/daemons.R
83f30edfbc71ca132e52d7ed442209fb *R/dispatcher.R
50ae4ec757f1c8457738d1560a65ceab *R/launchers.R
027880e46bf9be2aa489f945717c83bd *R/map.R
862958775d279c95de16992d614cff00 *R/mirai-package.R
29bdc57afce28502baf807f309300292 *R/mirai.R
93a66c82e3d671b541f31595add477bb *R/next.R
645cfe1bea719a45c7a5b6690f8f9d05 *R/parallel.R
521107a2e8487b96321d615f3ca4b4c4 *R/promises.R
9b34906b3afcf079da3e20d95bb130ef *README.md
23dd8dd2dee9ac6dfd661a2b8a6d4e1b *build/vignette.rds
5c7c320608f08cbb32e4f54e2c381806 *README.md
bc27738e2ebe41e69af4bd49013ef764 *build/vignette.rds
bee7aba0a9ea0a5969a285c22538e91a *inst/CITATION
3d8d9a2ea70aa02af21f7a1feaeeb379 *inst/doc/databases.Rmd
4320afa0b969625617c6f408bffd3612 *inst/doc/databases.html
d0734ac41448cf213c0e5cbd85fc0fd7 *inst/doc/mirai.Rmd
2ee493603c9113c9581e0e174e250ab8 *inst/doc/mirai.html
591713b4d7fdcc8b2c3252be48ffc9eb *inst/doc/parallel.Rmd
b54631016316e547e36ac4cfbf64c4ca *inst/doc/parallel.html
0ecb9be0221623b23a2a2a568027e2d8 *inst/doc/plumber.Rmd
bdcac60267f5879077861062d4f8dcd0 *inst/doc/plumber.html
5d0e32478230c812017544284feb1154 *inst/doc/promises.Rmd
dbc9e0a2e7ed87bf352b9d0f3759dc63 *inst/doc/promises.html
81fe1700fad3c7277c269ae63beb4e19 *inst/doc/databases.Rmd
2cd87074ec4ac66abfedfb83ee5f4188 *inst/doc/databases.html
85064a1c08c47b234fc62dcf2f8c3652 *inst/doc/mirai.Rmd
f74b7f6777bf8bcd8e3311b9984a60b0 *inst/doc/mirai.html
a4160672ef860d46ae6bbef54a66428b *inst/doc/parallel.Rmd
fdc9c9cdb226317b93002ff5689f5e64 *inst/doc/parallel.html
54d28c829cb5a1d7cfc256befded2448 *inst/doc/plumber.Rmd
a87f74c174eabbea4588ee7980370657 *inst/doc/plumber.html
54586d30d86b827cf0a09470749c3bc9 *inst/doc/promises.Rmd
b5616ba4ed0b6a2eff2cf7f582cb13c7 *inst/doc/promises.html
0369a9eaddfaa2c1146f43ff7e736f73 *inst/doc/shiny.R
942f92c74a4bae48cb7a5896e64296d6 *inst/doc/shiny.Rmd
6d813595458beb5307bff0c98f5e2e92 *inst/doc/shiny.html
2898fc115e80e45b2bc158b2b2347de4 *inst/doc/torch.Rmd
34f46bb57b5b3f0e773e1db2bd305eb0 *inst/doc/torch.html
88b136e81767e3498810a9d407fa5331 *inst/doc/torch.Rmd
4cba032a8218a18cabdf4c2ee714ce09 *inst/doc/torch.html
eca9335e0142ec80dc3867c06b220824 *man/as.promise.mirai.Rd
576cc8d54371adb5acaecadb49a55dd4 *man/call_mirai.Rd
7cf90b2fe1a4b041c819daf4ea7562be *man/collect_mirai.Rd
a05e4309bf619767f4d4cec834aba456 *man/daemon.Rd
9b0db8e385a20a45eebb09be236ba328 *man/daemons.Rd
b55aeb4961e6ca29ea31b0b4e923b5de *man/dispatcher.Rd
ba270952f4320098b219ff3a8a257401 *man/dot-daemon.Rd
80e82214369270f1111aac5c2843b5e1 *man/dispatcher.Rd
9f6cdf7ac8ea4fbc31ce2b3cb856c3d6 *man/dot-daemon.Rd
ce53036a281abecb3b22bb309b01d594 *man/dot-progress.Rd
c90fb525e7d4aa4e70f2087ab32555b6 *man/everywhere.Rd
adffca0464e589415b086ce837d0c7d1 *man/figures/logo.png
ca23fe2a3eb7be5ad04edcbbaffd2a06 *man/figures/logo.png
6e51bca57cea30196f08d67b342badd2 *man/host_url.Rd
423c5032b4fbe75ec2fdf028b4039676 *man/is_mirai.Rd
3515ab0d8c6379631ce969a3cc534def *man/is_mirai_error.Rd
089c315647df84dc3edd34cdc45cbd76 *man/launch_local.Rd
71aca9b6d5ba9830fbc51520b2d09bf7 *man/make_cluster.Rd
d036d777ba7730a55d2a2cd9b081eb9b *man/mirai-package.Rd
47f20f16b68ec3f24924ceb91102cc3e *man/mirai-package.Rd
7df29d4cc21c8e2916b20842d09c64df *man/mirai.Rd
acb59f0894fa6038d281b9932281456c *man/mirai_map.Rd
56fdfaa31fbb5c32e92af12489be6e69 *man/nextstream.Rd
d401c1035a7bdc6fc30033cd63bbe3ce *man/register_cluster.Rd
59ecbd4e89ebe68327f9e1cc6ef5d445 *man/remote_config.Rd
2b243db56fc7ae28303bd1dd91d12db2 *man/remote_config.Rd
f5df16d4ac81a5fc86e78f8748b6ba7e *man/saisei.Rd
c22fa4838d134df2f73959efa006a596 *man/serialization.Rd
000848186b429f16606ee15db170bba2 *man/serialization.Rd
b182b53096655500985316062dc81d0e *man/status.Rd
7642f1c6eb70f32bf51da3cfb07544b8 *man/stop_mirai.Rd
87568bb24bd39122b3d73eed742760e9 *man/unresolved.Rd
c55178d4197279bb1cde7e325d2a82de *man/with.miraiDaemons.Rd
eafe0d93bbb7deae71ba3effd654802b *tests/tests.R
3d8d9a2ea70aa02af21f7a1feaeeb379 *vignettes/databases.Rmd
c5f548d0d4df85e3f8cc0d8f2625c25d *vignettes/databases.Rmd.orig
d0734ac41448cf213c0e5cbd85fc0fd7 *vignettes/mirai.Rmd
d3dfbbcf5487e7babf2b0611b86c0b76 *vignettes/mirai.Rmd.orig
591713b4d7fdcc8b2c3252be48ffc9eb *vignettes/parallel.Rmd
53d7ce2b32aaf368467d43dd8aee6562 *tests/tests.R
81fe1700fad3c7277c269ae63beb4e19 *vignettes/databases.Rmd
3572b8726d778c57ab63d35af89698f1 *vignettes/databases.Rmd.orig
85064a1c08c47b234fc62dcf2f8c3652 *vignettes/mirai.Rmd
d4fdcccfa3056cdf5ea52dcf3fdc3744 *vignettes/mirai.Rmd.orig
a4160672ef860d46ae6bbef54a66428b *vignettes/parallel.Rmd
42d3fd2c3c4a0b71ddf10beb6fd824db *vignettes/parallel.Rmd.orig
0ecb9be0221623b23a2a2a568027e2d8 *vignettes/plumber.Rmd
54d28c829cb5a1d7cfc256befded2448 *vignettes/plumber.Rmd
e7a2fbd9be6a8677290d11c43f982444 *vignettes/plumber.Rmd.orig
dbd53989f1ab6bdb1195a3b914f5e775 *vignettes/precompile.R
5d0e32478230c812017544284feb1154 *vignettes/promises.Rmd
54586d30d86b827cf0a09470749c3bc9 *vignettes/promises.Rmd
493977a406b833f59a354999023bf99f *vignettes/promises.Rmd.orig
942f92c74a4bae48cb7a5896e64296d6 *vignettes/shiny.Rmd
2898fc115e80e45b2bc158b2b2347de4 *vignettes/torch.Rmd
ab51ceeabd501cc247ac39516d39d5bc *vignettes/torch.Rmd.orig
88b136e81767e3498810a9d407fa5331 *vignettes/torch.Rmd
62eebe604c324bd3f20144275c29c56b *vignettes/torch.Rmd.orig
8 changes: 8 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# mirai 1.1.1

* `serialization()` function signature and return value slightly modified for clarity. Successful registration / cancellation mesasges are no longer printed to the console.
* `dispatcher()` argument 'retry' now defaults to FALSE for consistency with non-dispatcher behaviour.
* `remote_config()` gains argument 'quote' to control whether or not to quote the daemon launch commmand, and now works with Slurm (thanks @michaelmayer2 #119).
* Ephemeral daemons now exit as soon as permissible, eiliminating the 2s linger period.
* Requires `nanonext` >= 1.1.1.

# mirai 1.1.0

* Adds `mirai_map()` for asynchronous parallel/distributed map using `mirai`, with `promises` integration. Allows recovery from partial failure or else early stopping, together with optional progress reporting.
Expand Down
12 changes: 7 additions & 5 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
ctx <- .context(sock)
aio <- recv_aio(ctx, mode = 1L, timeout = idletime, cv = cv)
wait(cv) || break
m <- .subset2(aio, "data")
m <- collect_aio(aio)
is.environment(m) || {
count < timerstart && {
start <- mclock()
Expand Down Expand Up @@ -175,18 +175,20 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
#'
#' @inheritParams daemon
#'
#' @return Invisible NULL.
#' @return Logical TRUE or FALSE.
#'
#' @keywords internal
#' @export
#'
.daemon <- function(url) {

sock <- socket(protocol = "rep", dial = url, autostart = NA)
cv <- cv()
sock <- socket(protocol = "rep")
on.exit(reap(sock))
pipe_notify(sock, cv = cv, remove = TRUE)
dial(sock, url = url, autostart = NA, error = TRUE)
data <- eval_mirai(recv(sock, mode = 1L, block = TRUE))
send(sock, data = data, mode = 1L, block = TRUE)
msleep(2000L)
send(sock, data = data, mode = 1L) || until(cv, .limit_short)

}

Expand Down
104 changes: 54 additions & 50 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,9 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
n <- 0L
}
`[[<-`(.., .compute, `[[<-`(`[[<-`(envir, "sock", sock), "n", n))
remotes <- substitute(remote)
if (!is.symbol(remotes)) remote <- remotes
if (length(remote))
launch_remote(url = envir[["urls"]], remote = remote, tls = envir[["tls"]], ..., .compute = .compute)
serialization_refhook()
check_register_everywhere()
} else {
daemons(n = 0L, .compute = .compute)
return(daemons(n = n, url = url, remote = remote, dispatcher = dispatcher, ..., seed = seed, tls = tls, pass = pass, .compute = .compute))
Expand Down Expand Up @@ -357,11 +355,11 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
} else {
sock <- req_socket(urld)
for (i in seq_len(n))
launch_and_sync_daemon(sock, wa3(urld, dots, next_stream(envir)), output)
launch_and_sync_daemon(sock, wa3(urld, dots, next_stream(envir)), output) || stop(._[["sync_timeout"]])
`[[<-`(envir, "urls", urld)
}
`[[<-`(.., .compute, `[[<-`(`[[<-`(envir, "sock", sock), "n", n))
serialization_refhook()
check_register_everywhere()
} else {
daemons(n = 0L, .compute = .compute)
return(daemons(n = n, url = url, remote = remote, dispatcher = dispatcher, ..., seed = seed, tls = tls, pass = pass, .compute = .compute))
Expand Down Expand Up @@ -489,58 +487,62 @@ status <- function(.compute = "default") {
#' Custom Serialization Functions
#'
#' Registers custom serialization and unserialization functions for sending and
#' receiving external pointer reference objects.
#'
#' @param refhook \strong{either} a list or pairlist of two functions: the
#' signature for the first must accept a reference object inheriting from
#' 'class' (or a list of such objects) and return a raw vector, and the
#' second must accept a raw vector and return reference objects (or a list
#' of such objects), \cr \strong{or else} NULL to reset.
#' @param class [default ""] a character string representing the class of object
#' that these serialization function will be applied to, e.g. 'ArrowTabular'
#' or 'torch_tensor'.
#' @param vec [default FALSE] the serialization functions accept and return
#' reference object individually e.g. \code{arrow::write_to_raw} and
#' \code{arrow::read_ipc_stream}. If TRUE, the serialization functions are
#' vectorized and accept and return a list of reference objects, e.g.
#' receiving reference objects.
#'
#' @param fns \strong{either} a list comprising 2 functions: \cr serialization
#' function: must accept a reference object (or list of objects) inheriting
#' from \sQuote{class} and return a raw vector.\cr unserialization function:
#' must accept a raw vector and return a reference object (or list of
#' reference objects).\cr \strong{or else} NULL to reset.
#' @param class the class of reference object (as a character string) that these
#' functions are applied to, e.g. 'ArrowTabular' or 'torch_tensor'.
#' @param vec [default FALSE] if FALSE the functions must accept and return
#' reference objects individually e.g. \code{arrow::write_to_raw} and
#' \code{arrow::read_ipc_stream}. If TRUE, the functions are vectorized and
#' must accept and return a list of reference objects, e.g.
#' \code{torch::torch_serialize} and \code{torch::torch_load}.
#'
#' @return Invisibly, the pairlist of currently-registered 'refhook' functions.
#' A message is printed to the console when functions are successfully
#' registered or reset.
#' @return Invisibly, a list comprising 'fns', class', and 'vec', or else NULL
#' if supplied to 'fns'.
#'
#' @details Calling without any arguments returns the pairlist of
#' currently-registered 'refhook' functions.
#' @details Registering new functions replaces any existing registered functions.
#'
#' This function may be called prior to or after setting daemons, with the
#' registered functions applying across all compute profiles.
#'
#' Calling without any arguments returns a list comprising the registered
#' values for 'fns', class', and 'vec', or else NULL if not registered.
#'
#' @examples
#' r <- serialization(list(function(x) serialize(x, NULL), unserialize))
#' print(serialization())
#' serialization(r)
#' reg <- serialization(
#' list(function(x) serialize(x, NULL), base::unserialize),
#' class = "example_class"
#' )
#' reg
#'
#' serialization(NULL)
#' print(serialization())
#'
#' @export
#'
serialization <- function(refhook = list(), class = "", vec = FALSE) {
serialization <- function(fns, class, vec = FALSE) {

register <- !missing(refhook)
cfg <- next_config(refhook = refhook, class = class, vec = vec)
missing(fns) && return(.[["serial"]])

if (register) {
if (is.list(refhook) && length(refhook) == 2L && is.function(refhook[[1L]]) && is.function(refhook[[2L]]))
cat("mirai serialization functions registered\n", file = stderr()) else
if (is.null(refhook))
cat("mirai serialization functions cancelled\n", file = stderr()) else
stop(._[["refhook_invalid"]])
`[[<-`(., "refhook", list(refhook, class, vec))
register_everywhere(refhook = refhook, class = class, vec = vec)
if (is.null(fns)) {
serial <- NULL
next_config(NULL)
} else if (length(fns) == 2L && is.function(fns[[1L]]) && is.function(fns[[2L]])) {
is.character(class) || stop(._[["character_class"]])
serial <- list(fns, class, vec)
next_config(fns, class = class, vec = vec)
} else {
stop(._[["serial_invalid"]])
}

invisible(cfg)
`[[<-`(., "serial", serial)
register_everywhere(serial)
invisible(serial)

}

Expand Down Expand Up @@ -584,21 +586,21 @@ parse_dots <- function(...) {
}

parse_tls <- function(tls)
switch(length(tls) + 1L, "", sprintf(",tls='%s'", tls), sprintf(",tls=c('%s','%s')", tls[1L], tls[2L]))
switch(length(tls) + 1L, "", sprintf(",tls=\"%s\"", tls), sprintf(",tls=c(\"%s\",\"%s\")", tls[1L], tls[2L]))

libp <- function(lp = .libPaths()) lp[file.exists(file.path(lp, "mirai"))][1L]

wa2 <- function(url, dots, tls = NULL)
shQuote(sprintf("mirai::daemon('%s'%s%s)", url, dots, parse_tls(tls)))
shQuote(sprintf("mirai::daemon(\"%s\"%s%s)", url, dots, parse_tls(tls)))

wa3 <- function(url, dots, rs, tls = NULL)
shQuote(sprintf("mirai::daemon('%s'%s%s,rs=c(%s))", url, dots, parse_tls(tls), paste0(rs, collapse = ",")))
shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s))", url, dots, parse_tls(tls), paste0(rs, collapse = ",")))

wa4 <- function(urld, dots, rs, n, urlc)
shQuote(sprintf(".libPaths(c('%s',.libPaths()));mirai::dispatcher('%s',n=%d,rs=c(%s),monitor='%s'%s)", libp(), urld, n, paste0(rs, collapse= ","), urlc, dots))
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s),monitor=\"%s\"%s)", libp(), urld, n, paste0(rs, collapse= ","), urlc, dots))

wa5 <- function(urld, dots, n, urlc, url)
shQuote(sprintf(".libPaths(c('%s',.libPaths()));mirai::dispatcher('%s',c('%s'),n=%d,monitor='%s'%s)", libp(), urld, paste0(url, collapse = "','"), n, urlc, dots))
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",c(\"%s\"),n=%d,monitor=\"%s\"%s)", libp(), urld, paste0(url, collapse = "','"), n, urlc, dots))

launch_daemon <- function(args, output)
system2(command = .command, args = c("-e", args), stdout = output, stderr = output, wait = FALSE)
Expand Down Expand Up @@ -660,13 +662,15 @@ query_status <- function(envir) {
dimnames = list(envir[["urls"]], c("i", "online", "instance", "assigned", "complete"))))
}

register_everywhere <- function(refhook, class, vec)
register_everywhere <- function(serial)
for (.compute in names(..))
everywhere(mirai::serialization(refhook = refhook, class = class, vec = vec),
refhook = refhook, class = class, vec = vec, .compute = .compute)
everywhere(
mirai::serialization(serial[[1L]], class = serial[[2L]], vec = serial[[3L]]),
.args = list(serial = serial),
.compute = .compute
)

serialization_refhook <- function(refhook = .[["refhook"]])
if (length(refhook[[1L]]))
register_everywhere(refhook = refhook[[1L]], class = refhook[[2L]], vec = refhook[[3L]])
check_register_everywhere <- function(serial = .[["serial"]])
if (length(serial[[1L]])) register_everywhere(serial)

._scm_. <- as.raw(c(0x07, 0x00, 0x00, 0x00, 0x42, 0x0a, 0x03, 0x00, 0x00, 0x00, 0x02, 0x03, 0x04, 0x00, 0x00, 0x05, 0x03, 0x00, 0x05, 0x00, 0x00, 0x00, 0x55, 0x54, 0x46, 0x2d, 0x38, 0xfc, 0x00, 0x00, 0x00))
4 changes: 2 additions & 2 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
#' specified port is not open etc.). Specifying TRUE continues retrying
#' (indefinitely) if not immediately successful, which is more resilient but
#' can mask potential connection issues.
#' @param retry [default TRUE] if TRUE, then a task where the daemon crashes or
#' @param retry [default FALSE] if TRUE, a task where the daemon crashes or
#' terminates unexpectedly will be automatically re-tried on the next daemon
#' instance to connect. In such a case, the mirai will remain unresolved but
#' \code{\link{status}} will show \sQuote{online} as 0 and \sQuote{assigned}
Expand Down Expand Up @@ -79,7 +79,7 @@
#' @export
#'
dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
retry = TRUE, token = FALSE, tls = NULL, pass = NULL,
retry = FALSE, token = FALSE, tls = NULL, pass = NULL,
rs = NULL, monitor = NULL) {

n <- if (is.numeric(n)) as.integer(n) else length(url)
Expand Down
Loading

0 comments on commit 23dcbff

Please sign in to comment.