Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BETA: Global progress handler #95

Closed
1 of 2 tasks
HenrikBengtsson opened this issue Nov 17, 2020 · 13 comments
Closed
1 of 2 tasks

BETA: Global progress handler #95

HenrikBengtsson opened this issue Nov 17, 2020 · 13 comments
Labels
help wanted Extra attention is needed
Milestone

Comments

@HenrikBengtsson
Copy link
Collaborator

HenrikBengtsson commented Nov 17, 2020

I've implemented support for registering a global condition handler (requires R >= 4.0.0) that will listen to progress updates (formally 'progression' conditions) and render them. When used, there is no need for with_progress(). As the below example illustrates, it buffers and flushes standard output and messages (and warnings). Just like before, it also works when parallelizing using the future framework.

Here's an example:

library(progressr)

slow_fcn <- function(xs) {
  p <- progressor(along = xs)
  p(sprintf("Starting", amount = 0))
  y <- lapply(xs, function(x) {
    cat(sprintf("[stdout]: x = %g\n", x))
    Sys.sleep(0.5)
    z <- sqrt(x)
    message(sprintf("[message]: z = %g", z))
    p(sprintf("Processed x = %g", x))
    z
  })
  sum(unlist(y))
}
handlers(global = TRUE)
y <- slow_fcn(1:3)
# [stdout]: x = 1
# [message]: z = 1
# [stdout]: x = 2
  |==========              |  33%

To disable the global handlers, use:

handlers(global = FALSE)

Please give it a spin by installing the develop branch;

remotes::install_github("HenrikBengtsson/progressr", ref = "develop")

Tasks

  • Test it in real-world scenarios:

  • The name register_global_progression_handler() is a bit much and should be renamed to something shorter. If someone got a better name, please let me know. UPDATE 2020-12-04: We're now using handlers(global = TRUE) and handlers(global = FALSE) for this.

An alternative is to make handlers("void") the default, which will also unregister any global progression handler. When setting any other progression handler(s), say handlers("progress"), then the global progression handler will be automatically registered as well. This way the end-user interface will be to use handlers().

@Bisaloo
Copy link
Contributor

Bisaloo commented Nov 18, 2020

I tried to follow your instructions but I get

Error: ‘is.null(stdout_file)’ is not TRUE

which was not the case before. Any idea what went wrong and how to fix it?

@HenrikBengtsson
Copy link
Collaborator Author

@Bisaloo, thanks for the feedback. That error message comes from an internal sanity check. I'd like to be able to reproduce this. Can you give me a bit more details, e.g.

  1. Do you get any output at all?

  2. After you get that error message, what does traceback() give?

  3. What's your sessionInfo() after you get the error?

  4. Are you running R in the terminal, in RStudio, or elsewhere?

@Bisaloo
Copy link
Contributor

Bisaloo commented Nov 18, 2020

Do you get any output at all?

No, I get NULL. There is a small chance this NULL actually comes from my own code after encountering the error though.

After you get that error message, what does traceback() give?

traceback
37: stop(sQuote(call), " is not TRUE", call. = FALSE, domain = NA)
36: stop_if_not(is.null(stdout_file))
35: finish(debug = debug)
34: handle_progression(condition)
33: (function (condition) 
    {
        if (inherits(condition, c("interrupt", "error"))) {
            progression <- control_progression("shutdown")
            finished <- finish()
            stop_if_not(is.null(stdout_file), length(conditions) == 
                0L, is.na(capture_conditions), isTRUE(finished))
            return()
        }
        if (inherits(condition, "progression")) {
            return(handle_progression(condition))
        }
        if (is.na(capture_conditions) || !isTRUE(capture_conditions)) 
            return()
        if (is.null(delays) || !inherits(condition, delays$conditions)) 
            return()
        conditions[[length(conditions) + 1L]] <<- condition
        if (inherits(condition, "message")) {
            invokeRestart("muffleMessage")
        }
        else if (inherits(condition, "warning")) {
            invokeRestart("muffleWarning")
        }
        else if (inherits(condition, "condition")) {
            restarts <- computeRestarts(condition)
            for (restart in restarts) {
                name <- restart$name
                if (is.null(name)) 
                    next
                if (!grepl("^muffle", name)) 
                    next
                invokeRestart(restart)
                break
            }
        }
    })(structure(list(owner_session_uuid = structure("8e009b4d-e27d-afe9-161d-873bb7f267a3", source = list(
        host = "discosura", info = c(sysname = "Linux", release = "5.4.0-54-generic", 
        version = "#60-Ubuntu SMP Fri Nov 6 10:37:59 UTC 2020", nodename = "discosura", 
        machine = "x86_64", login = "unknown", user = "hugo", effective_user = "hugo"
        ), time = structure(1605693317.5908, class = c("POSIXct", 
        "POSIXt")), tempdir = "/tmp/RtmpaqfQxS", pid = 11792L, random = 1557582705L)), 
        progressor_uuid = "636d2548-8682-245d-c72c-63fd10bdfa27", 
        session_uuid = "8e009b4d-e27d-afe9-161d-873bb7f267a3", progression_index = 2L, 
        progression_time = "2020-11-18 19:37:33.661 +0100", type = "update", 
        message = character(0), amount = 1, step = NULL, time = structure(1605724653.66199, class = c("POSIXct", 
        "POSIXt")), call = progress(type = type, ..., progressor_uuid = progressor_uuid, 
            progression_index = progression_index, owner_session_uuid = owner_session_uuid)), class = c("progression", 
    "immediateCondition", "condition")))
32: signalCondition(cond)
31: doWithOneRestart(return(expr), restart)
30: withOneRestart(expr, restarts[[1L]])
29: withRestarts(signalCondition(cond), muffleProgression = function(p) NULL)
28: progress(type = type, ..., progressor_uuid = progressor_uuid, 
        progression_index = progression_index, owner_session_uuid = owner_session_uuid)
27: p() at get_spec.R#118
26: ...future.FUN(...future.X_jj, ...)
25: FUN(X[[i]], ...)
24: lapply(seq_along(...future.elements_ii), FUN = function(jj) {
        ...future.X_jj <- ...future.elements_ii[[jj]]
        NULL
        ...future.FUN(...future.X_jj, ...)
    })
23: (function (...) 
    {
        ...future.globals.maxSize.org <- getOption("future.globals.maxSize")
        if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) {
            oopts <- options(future.globals.maxSize = ...future.globals.maxSize)
            on.exit(options(oopts), add = TRUE)
        }
        {
            lapply(seq_along(...future.elements_ii), FUN = function(jj) {
                ...future.X_jj <- ...future.elements_ii[[jj]]
                NULL
                ...future.FUN(...future.X_jj, ...)
            })
        }
    })()
22: do.call(function(...) {
        ...future.globals.maxSize.org <- getOption("future.globals.maxSize")
        if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) {
            oopts <- options(future.globals.maxSize = ...future.globals.maxSize)
            on.exit(options(oopts), add = TRUE)
        }
        {
            lapply(seq_along(...future.elements_ii), FUN = function(jj) {
                ...future.X_jj <- ...future.elements_ii[[jj]]
                NULL
                ...future.FUN(...future.X_jj, ...)
            })
        }
    }, args = future.call.arguments)
21: eval(quote({
        do.call(function(...) {
            ...future.globals.maxSize.org <- getOption("future.globals.maxSize")
            if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) {
                oopts <- options(future.globals.maxSize = ...future.globals.maxSize)
                on.exit(options(oopts), add = TRUE)
            }
            {
                lapply(seq_along(...future.elements_ii), FUN = function(jj) {
                    ...future.X_jj <- ...future.elements_ii[[jj]]
                    NULL
                    ...future.FUN(...future.X_jj, ...)
                })
            }
        }, args = future.call.arguments)
    }), new.env())
20: eval(quote({
        do.call(function(...) {
            ...future.globals.maxSize.org <- getOption("future.globals.maxSize")
            if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) {
                oopts <- options(future.globals.maxSize = ...future.globals.maxSize)
                on.exit(options(oopts), add = TRUE)
            }
            {
                lapply(seq_along(...future.elements_ii), FUN = function(jj) {
                    ...future.X_jj <- ...future.elements_ii[[jj]]
                    NULL
                    ...future.FUN(...future.X_jj, ...)
                })
            }
        }, args = future.call.arguments)
    }), new.env())
19: eval(expr, p)
18: eval(expr, p)
17: eval.parent(substitute(eval(quote(expr), envir)))
16: base::local({
        do.call(function(...) {
            ...future.globals.maxSize.org <- getOption("future.globals.maxSize")
            if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) {
                oopts <- options(future.globals.maxSize = ...future.globals.maxSize)
                on.exit(options(oopts), add = TRUE)
            }
            {
                lapply(seq_along(...future.elements_ii), FUN = function(jj) {
                    ...future.X_jj <- ...future.elements_ii[[jj]]
                    NULL
                    ...future.FUN(...future.X_jj, ...)
                })
            }
        }, args = future.call.arguments)
    })
15: base::withVisible(base::local({
        do.call(function(...) {
            ...future.globals.maxSize.org <- getOption("future.globals.maxSize")
            if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) {
                oopts <- options(future.globals.maxSize = ...future.globals.maxSize)
                on.exit(options(oopts), add = TRUE)
            }
            {
                lapply(seq_along(...future.elements_ii), FUN = function(jj) {
                    ...future.X_jj <- ...future.elements_ii[[jj]]
                    NULL
                    ...future.FUN(...future.X_jj, ...)
                })
            }
        }, args = future.call.arguments)
    }))
14: base::withCallingHandlers({
        ...future.value <- base::withVisible(base::local({
            do.call(function(...) {
                ...future.globals.maxSize.org <- getOption("future.globals.maxSize")
                if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) {
                    oopts <- options(future.globals.maxSize = ...future.globals.maxSize)
                    on.exit(options(oopts), add = TRUE)
                }
                {
                    lapply(seq_along(...future.elements_ii), FUN = function(jj) {
                      ...future.X_jj <- ...future.elements_ii[[jj]]
                      NULL
                      ...future.FUN(...future.X_jj, ...)
                    })
                }
            }, args = future.call.arguments)
        }))
        future::FutureResult(value = ...future.value$value, visible = ...future.value$visible, 
            rng = !identical(base::globalenv()$.Random.seed, ...future.rng), 
            started = ...future.startTime, version = "1.8")
    }, condition = base::local({
        c <- base::c
        inherits <- base::inherits
        invokeRestart <- base::invokeRestart
        length <- base::length
        list <- base::list
        seq.int <- base::seq.int
        signalCondition <- base::signalCondition
        sys.calls <- base::sys.calls
        Sys.time <- base::Sys.time
        `[[` <- base::`[[`
        `+` <- base::`+`
        `<<-` <- base::`<<-`
        sysCalls <- function(calls = sys.calls(), from = 1L) {
            calls[seq.int(from = from + 12L, to = length(calls) - 
                3L)]
        }
        function(cond) {
            if (inherits(cond, "error")) {
                sessionInformation <- function() {
                    list(r = base::R.Version(), locale = base::Sys.getlocale(), 
                      rngkind = base::RNGkind(), namespaces = base::loadedNamespaces(), 
                      search = base::search(), system = base::Sys.info())
                }
                ...future.conditions[[length(...future.conditions) + 
                    1L]] <<- list(condition = cond, calls = c(sysCalls(from = ...future.frame), 
                    cond$call), session = sessionInformation(), timestamp = Sys.time(), 
                    signaled = 0L)
                signalCondition(cond)
            }
            else if (inherits(cond, c("condition", "immediateCondition"
            ))) {
                signal <- TRUE && inherits(cond, "immediateCondition")
                ...future.conditions[[length(...future.conditions) + 
                    1L]] <<- list(condition = cond, signaled = base::as.integer(signal))
                if (!signal) {
                    muffleCondition <- function (cond) 
                    {
                      inherits <- base::inherits
                      invokeRestart <- base::invokeRestart
                      muffled <- FALSE
                      if (inherits(cond, "message")) {
                        invokeRestart("muffleMessage")
                        muffled <- TRUE
                      }
                      else if (inherits(cond, "warning")) {
                        invokeRestart("muffleWarning")
                        muffled <- TRUE
                      }
                      else if (inherits(cond, "condition")) {
                        computeRestarts <- base::computeRestarts
                        grepl <- base::grepl
                        is.null <- base::is.null
                        restarts <- computeRestarts(cond)
                        for (restart in restarts) {
                          name <- restart$name
                          if (is.null(name)) 
                            next
                          if (!grepl("^muffle", name)) 
                            next
                          invokeRestart(restart)
                          muffled <- TRUE
                          break
                        }
                      }
                      invisible(muffled)
                    }
                    muffleCondition(cond)
                }
            }
            else {
                muffleCondition <- function (cond) 
                {
                    inherits <- base::inherits
                    invokeRestart <- base::invokeRestart
                    muffled <- FALSE
                    if (inherits(cond, "message")) {
                      invokeRestart("muffleMessage")
                      muffled <- TRUE
                    }
                    else if (inherits(cond, "warning")) {
                      invokeRestart("muffleWarning")
                      muffled <- TRUE
                    }
                    else if (inherits(cond, "condition")) {
                      computeRestarts <- base::computeRestarts
                      grepl <- base::grepl
                      is.null <- base::is.null
                      restarts <- computeRestarts(cond)
                      for (restart in restarts) {
                        name <- restart$name
                        if (is.null(name)) 
                          next
                        if (!grepl("^muffle", name)) 
                          next
                        invokeRestart(restart)
                        muffled <- TRUE
                        break
                      }
                    }
                    invisible(muffled)
                }
                muffleCondition(cond)
            }
        }
    }))
13: doTryCatch(return(expr), name, parentenv, handler)
12: tryCatchOne(expr, names, parentenv, handlers[[1L]])
11: tryCatchList(expr, classes, parentenv, handlers)
10: base::tryCatch({
        base::withCallingHandlers({
            ...future.value <- base::withVisible(base::local({
                do.call(function(...) {
                    ...future.globals.maxSize.org <- getOption("future.globals.maxSize")
                    if (!identical(...future.globals.maxSize.org, 
                      ...future.globals.maxSize)) {
                      oopts <- options(future.globals.maxSize = ...future.globals.maxSize)
                      on.exit(options(oopts), add = TRUE)
                    }
                    {
                      lapply(seq_along(...future.elements_ii), FUN = function(jj) {
                        ...future.X_jj <- ...future.elements_ii[[jj]]
                        NULL
                        ...future.FUN(...future.X_jj, ...)
                      })
                    }
                }, args = future.call.arguments)
            }))
            future::FutureResult(value = ...future.value$value, visible = ...future.value$visible, 
                rng = !identical(base::globalenv()$.Random.seed, 
                    ...future.rng), started = ...future.startTime, 
                version = "1.8")
        }, condition = base::local({
            c <- base::c
            inherits <- base::inherits
            invokeRestart <- base::invokeRestart
            length <- base::length
            list <- base::list
            seq.int <- base::seq.int
            signalCondition <- base::signalCondition
            sys.calls <- base::sys.calls
            Sys.time <- base::Sys.time
            `[[` <- base::`[[`
            `+` <- base::`+`
            `<<-` <- base::`<<-`
            sysCalls <- function(calls = sys.calls(), from = 1L) {
                calls[seq.int(from = from + 12L, to = length(calls) - 
                    3L)]
            }
            function(cond) {
                if (inherits(cond, "error")) {
                    sessionInformation <- function() {
                      list(r = base::R.Version(), locale = base::Sys.getlocale(), 
                        rngkind = base::RNGkind(), namespaces = base::loadedNamespaces(), 
                        search = base::search(), system = base::Sys.info())
                    }
                    ...future.conditions[[length(...future.conditions) + 
                      1L]] <<- list(condition = cond, calls = c(sysCalls(from = ...future.frame), 
                      cond$call), session = sessionInformation(), 
                      timestamp = Sys.time(), signaled = 0L)
                    signalCondition(cond)
                }
                else if (inherits(cond, c("condition", "immediateCondition"
                ))) {
                    signal <- TRUE && inherits(cond, "immediateCondition")
                    ...future.conditions[[length(...future.conditions) + 
                      1L]] <<- list(condition = cond, signaled = base::as.integer(signal))
                    if (!signal) {
                      muffleCondition <- function (cond) 
                      {
                        inherits <- base::inherits
                        invokeRestart <- base::invokeRestart
                        muffled <- FALSE
                        if (inherits(cond, "message")) {
                          invokeRestart("muffleMessage")
                          muffled <- TRUE
                        }
                        else if (inherits(cond, "warning")) {
                          invokeRestart("muffleWarning")
                          muffled <- TRUE
                        }
                        else if (inherits(cond, "condition")) {
                          computeRestarts <- base::computeRestarts
                          grepl <- base::grepl
                          is.null <- base::is.null
                          restarts <- computeRestarts(cond)
                          for (restart in restarts) {
                            name <- restart$name
                            if (is.null(name)) 
                              next
                            if (!grepl("^muffle", name)) 
                              next
                            invokeRestart(restart)
                            muffled <- TRUE
                            break
                          }
                        }
                        invisible(muffled)
                      }
                      muffleCondition(cond)
                    }
                }
                else {
                    muffleCondition <- function (cond) 
                    {
                      inherits <- base::inherits
                      invokeRestart <- base::invokeRestart
                      muffled <- FALSE
                      if (inherits(cond, "message")) {
                        invokeRestart("muffleMessage")
                        muffled <- TRUE
                      }
                      else if (inherits(cond, "warning")) {
                        invokeRestart("muffleWarning")
                        muffled <- TRUE
                      }
                      else if (inherits(cond, "condition")) {
                        computeRestarts <- base::computeRestarts
                        grepl <- base::grepl
                        is.null <- base::is.null
                        restarts <- computeRestarts(cond)
                        for (restart in restarts) {
                          name <- restart$name
                          if (is.null(name)) 
                            next
                          if (!grepl("^muffle", name)) 
                            next
                          invokeRestart(restart)
                          muffled <- TRUE
                          break
                        }
                      }
                      invisible(muffled)
                    }
                    muffleCondition(cond)
                }
            }
        }))
    }, error = function(ex) {
        base::structure(base::list(value = NULL, visible = NULL, 
            conditions = ...future.conditions, rng = !identical(base::globalenv()$.Random.seed, 
                ...future.rng), version = "1.8"), class = "FutureResult")
    }, finally = {
        {
            NULL
            future::plan(structure(list(structure(function (expr, 
                envir = parent.frame(), substitute = TRUE, lazy = FALSE, 
                seed = NULL, globals = TRUE, local = TRUE, earlySignal = FALSE, 
                label = NULL, ...) 
            {
                if (substitute) 
                    expr <- substitute(expr)
                local <- as.logical(local)
                future <- SequentialFuture(expr = expr, envir = envir, 
                    substitute = FALSE, lazy = lazy, seed = seed, 
                    globals = globals, local = local, earlySignal = earlySignal, 
                    label = label, ...)
                if (!future$lazy) 
                    future <- run(future)
                invisible(future)
            }, class = c("sequential", "uniprocess", "future", "function"
            ))), class = c("FutureStrategyList", "list")), .cleanup = FALSE, 
                .init = FALSE)
        }
        base::options(...future.oldOptions)
    })
9: eval(expr, envir = envir, enclos = baseenv())
8: eval(expr, envir = envir, enclos = baseenv())
7: run.UniprocessFuture(future)
6: run(future)
5: makeFuture(expr, substitute = FALSE, envir = envir, globals = globals, 
       packages = packages, seed = seed, lazy = lazy, ...)
4: future(expr, substitute = FALSE, envir = envir, stdout = future.stdout, 
       conditions = future.conditions, globals = globals_ii, packages = packages_ii, 
       seed = future.seed, lazy = future.lazy, label = labels[ii])
3: future_xapply(FUN = FUN, nX = nX, chunk_args = X, args = list(...), 
       get_chunk = `[`, expr = expr, envir = envir, future.globals = future.globals, 
       future.packages = future.packages, future.scheduling = future.scheduling, 
       future.chunk.size = future.chunk.size, future.stdout = future.stdout, 
       future.conditions = future.conditions, future.seed = future.seed, 
       future.lazy = future.lazy, future.label = future.label, fcn_name = fcn_name, 
       args_name = args_name, debug = debug)
2: future_lapply(files, function(x) {
       p()
       tryCatch(gsp(x), error = function(e) NULL)
   }) at get_spec.R#117
1: lr_get_spec(system.file("testdata", package = "lightr"), ext = "jdx")

What's your sessionInfo() after you get the error?

sessionInfo
R version 4.0.3 (2020-10-10)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 20.04.1 LTS

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.9.0
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.9.0

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=fr_FR.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=fr_FR.UTF-8    LC_MESSAGES=en_US.UTF-8    LC_PAPER=fr_FR.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C             LC_MEASUREMENT=fr_FR.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] lightr_1.3           progressr_0.6.0-9000 testthat_3.0.0.9000 

loaded via a namespace (and not attached):
 [1] compiler_4.0.3     prettyunits_1.1.1  remotes_2.2.0      tools_4.0.3        progress_1.2.2     digest_0.6.27     
 [7] pkgbuild_1.1.0     pkgload_1.1.0      memoise_1.1.0      pkgconfig_2.0.3    rlang_0.4.8        cli_2.1.0         
[13] rstudioapi_0.13    parallel_4.0.3     withr_2.3.0        xml2_1.3.2         desc_1.2.0         fs_1.5.0          
[19] vctrs_0.3.4        globals_0.13.1     devtools_2.3.1     hms_0.5.3          rprojroot_2.0.2    glue_1.4.2        
[25] listenv_0.8.0      R6_2.5.0           processx_3.4.4     future.apply_1.6.0 fansi_0.4.1        sessioninfo_1.1.1 
[31] callr_3.5.1        magrittr_1.5       ps_1.4.0           codetools_0.2-18   ellipsis_0.3.1     usethis_1.6.1     
[37] assertthat_0.2.1   future_1.19.1      crayon_1.3.4     

Are you running R in the terminal, in RStudio, or elsewhere?

RStudio


You can probably reproduce this issue with:

remotes::install_github("ropensci/lightr@progressr-globalhandlers")

library(progressr)
register_global_progression_handler()

library(lightr)

lr_get_spec(system.file("testdata", package = "lightr"), ext = "jdx")

@HenrikBengtsson
Copy link
Collaborator Author

Thanks, I misunderstood your initial "I tried to follow your instructions" to mean that the example in #95 (comment) failed. Do you mind confirming that at least that one works for you?

@Bisaloo
Copy link
Contributor

Bisaloo commented Nov 18, 2020

Ah yes, this was confusing indeed. I confirm the example in the OP works perfectly for me.

@HenrikBengtsson
Copy link
Collaborator Author

Thxs. I can reproduce the error with your package example.

@HenrikBengtsson
Copy link
Collaborator Author

I can reproduce the problem with:

library(progressr)
options(progressr.demo.delay = 0.2)
options(progressr.clear = FALSE)

foo <- function(steps = 3L) {
  p <- progressor(steps + 1L)
  y <- future.apply::future_lapply(1:steps, function(n) {
    p()
    slow_sum(1:2, stdout=TRUE, message=FALSE)
  })
}

register_global_progression_handler()
foo()

##  |========================================| 100%
## Error: 'is.null(stdout_file)' is not TRUE

with the end of the traceback being:

34: handle_progression(condition)
35: finish(debug = debug)
36: stop_if_not(is.null(stdout_file))

The problem goes away if I allow for one more progressor step, e.g. p <- progressor(steps + 1L). It also goes away if I drop the p(). Also, replacing future.apply::future_lapply() with a plan lapply() works.

This means I've got something to work with to fix this.

HenrikBengtsson added a commit that referenced this issue Nov 18, 2020
…se active sinks and the stdout_file connection due to other, blocking sinks [#95]
@HenrikBengtsson
Copy link
Collaborator Author

HenrikBengtsson commented Nov 18, 2020

I think I fixed it. @Bisaloo, please re-install the develop version and retry.

Details: The problem was that the internal sanity check assumed that the sink+connection used to buffer stdout could be closed at any time, e.g. when the progressor reaches 100%. This was not possible when it reached 100% inside a future because the future had another sink active blocking the progressors sink from being close. Here's a simpler reproducible example of this problem without futures;

library(progressr)
options(progressr.demo.delay = 0.2)

register_global_progression_handler()

local({
  p <- progressor(1L)  ## this sets up a sink to capture and buffer stdout
  bfr <- utils::capture.output({
    p()  ## this completes and closes the progressor but it can't close 
         ## its sink because of the active capture.output()
    slow_sum(1:2, stdout=TRUE, message=FALSE)
  })
})

@HenrikBengtsson HenrikBengtsson added the help wanted Extra attention is needed label Nov 18, 2020
@Bisaloo
Copy link
Contributor

Bisaloo commented Nov 19, 2020

Yes, I can confirm it is fixed for me. Thank you very much for the quick fix!

@HenrikBengtsson
Copy link
Collaborator Author

HenrikBengtsson commented Dec 2, 2020

Instead of the end-user having to call:

register_global_progression_handler()

I'm thinking of providing

# Enable
handlers(global = TRUE)

# Disable
handlers(global = FALSE)

@HenrikBengtsson
Copy link
Collaborator Author

I've pushed support for:

# Enable
> handlers(global = TRUE)

# Disable
> handlers(global = FALSE)

# Query
> handlers(global = NA)
[1] FALSE

Next, I'll remove register_global_progression_handler() from the public API.

HenrikBengtsson added a commit that referenced this issue Dec 4, 2020
@HenrikBengtsson
Copy link
Collaborator Author

I've updated the README and the vignette (they're the same) to mainly use global progress handling in all of the examples.

@HenrikBengtsson
Copy link
Collaborator Author

Finally managed to run revdep checks where all package tests are run with `progressr::handlers(global = TRUE) - took some hacking. The good news is that is no negative impact to enable the global progression handler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants