diff --git a/CHANGELOG.md b/CHANGELOG.md index 24603514..caaf9ab4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,14 @@ # Change Log -## [0.6.1] 2017-11-13 + +## [0.6.1] 2017-12-05 ### Added - Support for users to use programmatically generated credentials and cluster config +- Support for users to delete job and terminate job +### Changed +- [BREAKING CHANGE] when wait = TRUE, both job and job results are deleted at the end of the run, set jobAutoComplete to FALSE to keep them +- Add retry to get job result +- Add errorHandling and wait option to job metadata +- Save job metadata to job result storage blob ## [0.6.0] 2017-11-03 ### Added diff --git a/NAMESPACE b/NAMESPACE index f10cbb06..0e0d3134 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,6 +1,7 @@ # Generated by roxygen2: do not edit by hand export(createOutputFile) +export(deleteJob) export(deleteStorageContainer) export(deleteStorageFile) export(generateClusterConfig) @@ -16,11 +17,13 @@ export(listStorageFiles) export(makeCluster) export(registerDoAzureParallel) export(resizeCluster) +export(setAutoDeleteJob) export(setChunkSize) export(setCredentials) export(setHttpTraffic) export(setReduce) export(setVerbose) export(stopCluster) +export(terminateJob) export(waitForNodesToComplete) export(waitForTasksToComplete) diff --git a/R/doAzureParallel.R b/R/doAzureParallel.R index e862ba51..d0841370 100644 --- a/R/doAzureParallel.R +++ b/R/doAzureParallel.R @@ -73,6 +73,20 @@ setChunkSize <- function(value = 1) { assign("chunkSize", value, envir = .doAzureBatchGlobals) } +#' Specify whether to delete job and its result after asychronous job is completed. +#' +#' @param value boolean of TRUE or FALSE +#' +#' @examples +#' setAutoDeleteJob(FALSE) +#' @export +setAutoDeleteJob <- function(value = TRUE) { + if (!is.logical(value)) + stop("setAutoDeleteJob requires a boolean argument") + + assign("autoDeleteJob", value, envir = .doAzureBatchGlobals) +} + #' Apply reduce function on a group of iterations of the foreach loop together per task. #' #' @param fun The number of iterations to group @@ -232,6 +246,18 @@ setHttpTraffic <- function(value = FALSE) { wait <- obj$options$azure$wait } + # by default, delete both job and job result after synchronous job is completed + autoDeleteJob <- TRUE + + if (exists("autoDeleteJob", envir = .doAzureBatchGlobals)) { + autoDeleteJob <- get("autoDeleteJob", envir = .doAzureBatchGlobals) + } + + if (!is.null(obj$options$azure$autoDeleteJob) && + is.logical(obj$options$azure$autoDeleteJob)) { + autoDeleteJob <- obj$options$azure$autoDeleteJob + } + inputs <- FALSE if (!is.null(obj$options$azure$inputs)) { storageCredentials <- rAzureBatch::getStorageCredentials() @@ -282,6 +308,10 @@ setHttpTraffic <- function(value = FALSE) { chunkSize <- 1 + if (exists("chunkSize", envir = .doAzureBatchGlobals)) { + chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals) + } + if (!is.null(obj$options$azure$chunkSize)) { chunkSize <- obj$options$azure$chunkSize } @@ -290,25 +320,34 @@ setHttpTraffic <- function(value = FALSE) { chunkSize <- obj$options$azure$chunksize } - if (exists("chunkSize", envir = .doAzureBatchGlobals)) { - chunkSize <- get("chunkSize", envir = .doAzureBatchGlobals) - } - chunkSizeKeyValuePair <- list(name = "chunkSize", value = as.character(chunkSize)) - if (is.null(obj$packages)) { - metadata <- - list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair) - } else { + metadata <- + list(enableCloudCombineKeyValuePair, chunkSizeKeyValuePair) + + if (!is.null(obj$packages)) { packagesKeyValuePair <- list(name = "packages", value = paste(obj$packages, collapse = ";")) - metadata <- - list(enableCloudCombineKeyValuePair, - chunkSizeKeyValuePair, - packagesKeyValuePair) + metadata[[length(metadata) + 1]] <- packagesKeyValuePair + } + + if (!is.null(obj$errorHandling)) { + errorHandlingKeyValuePair <- + list(name = "errorHandling", + value = as.character(obj$errorHandling)) + + metadata[[length(metadata) + 1]] <- errorHandlingKeyValuePair + } + + if (!is.null(obj$options$azure$wait)) { + waitKeyValuePair <- + list(name = "wait", + value = as.character(obj$options$azure$wait)) + + metadata[[length(metadata) + 1]] <- waitKeyValuePair } retryCounter <- 0 @@ -454,6 +493,10 @@ setHttpTraffic <- function(value = FALSE) { job <- rAzureBatch::getJob(id) cat(sprintf("Id: %s", job$id), fill = TRUE) + if (!is.null(job$id)) { + saveMetadataBlob(job$id, metadata) + } + ntasks <- length(argsList) startIndices <- seq(1, length(argsList), chunkSize) @@ -542,7 +585,7 @@ setHttpTraffic <- function(value = FALSE) { numberOfFailedTasks <- sum(unlist(failTasks)) - if (numberOfFailedTasks > 0) { + if (numberOfFailedTasks > 0 && autoDeleteJob == FALSE) { .createErrorViewerPane(id, failTasks) } @@ -563,13 +606,22 @@ setHttpTraffic <- function(value = FALSE) { cat(sprintf("Number of errors: %i", numberOfFailedTasks), fill = TRUE) - rAzureBatch::deleteJob(id) + # delete job from batch service and job result from storage blob + if (autoDeleteJob) { + deleteJob(id) + } if (identical(obj$errorHandling, "stop") && !is.null(errorValue)) { - msg <- sprintf("task %d failed - '%s'", - errorIndex, - conditionMessage(errorValue)) + msg <- + sprintf( + paste0( + "task %d failed - '%s'.\r\nBy default job and its result is deleted after run is over, use", + " setAutoDeleteJob(FALSE) or autoDeleteJob = FALSE option to keep them for investigation." + ), + errorIndex, + conditionMessage(errorValue) + ) stop(simpleError(msg, call = expr)) } else { diff --git a/R/jobUtilities.R b/R/jobUtilities.R index 837bb383..ac4c3d64 100644 --- a/R/jobUtilities.R +++ b/R/jobUtilities.R @@ -19,7 +19,9 @@ getJob <- function(jobId, verbose = TRUE) { list( chunkSize = 1, enableCloudCombine = "TRUE", - packages = "" + packages = "", + errorHandling = "stop", + wait = "TRUE" ) if (!is.null(job$metadata)) { @@ -37,6 +39,10 @@ getJob <- function(jobId, verbose = TRUE) { fill = TRUE) cat(sprintf("\tpackages: %s", metadata$packages), fill = TRUE) + cat(sprintf("\terrorHandling: %s", metadata$errorHandling), + fill = TRUE) + cat(sprintf("\twait: %s", metadata$wait), + fill = TRUE) } taskCounts <- rAzureBatch::getJobTaskCounts(jobId = jobId) @@ -63,11 +69,13 @@ getJob <- function(jobId, verbose = TRUE) { ), fill = TRUE ) + cat(sprintf("\njob state: %s", job$state), fill = TRUE) } jobObj <- list(jobId = job$id, metadata = metadata, - tasks = tasks) + tasks = tasks, + jobState = job$state) return(jobObj) } @@ -160,20 +168,132 @@ getJobResult <- function(jobId) { stop("jobId must contain at least 3 characters.") } - tempFile <- tempFile <- tempfile("getJobResult", fileext = ".rds") + metadata <- readMetadataBlob(jobId) - results <- rAzureBatch::downloadBlob( - jobId, - paste0("result/", jobId, "-merge-result.rds"), - downloadPath = tempFile, - overwrite = TRUE - ) + if (!is.null(metadata)) { + if (metadata$enableCloudCombine == "FALSE") { + cat("enalbeCloudCombine is set to FALSE, no job merge result is available", + fill = TRUE) + + return() + } - if (is.vector(results)) { - results <- readRDS(tempFile) + if (metadata$wait == "FALSE") { + job <- getJob(jobId, verbose = FALSE) + + if (job$jobState == "active") { + stop(sprintf( + "job %s is not finished yet, please try again later", + job$jobId + )) + } else if (job$jobState != "completed") { + stop(sprintf( + "job %s is %s state, no job result is available", + job$jobId, + job$jobState + )) + } + + # if the job has failed task + if (job$tasks$failed > 0) { + if (metadata$errorHandling == "stop") { + stop( + sprintf( + "job %s has failed tasks and error handling is set to 'stop', no result will be avaialble", + job$jobId + ) + ) + } else { + if (job$tasks$succeeded == 0) { + stop(sprintf( + "all tasks failed for job %s, no result will be avaialble", + job$jobId + )) + } + } + } + } } - return(results) + tempFile <- tempfile("getJobResult", fileext = ".rds") + + retryCounter <- 0 + maxRetryCount <- 3 + repeat { + if (retryCounter > maxRetryCount) { + stop( + sprintf( + "Error getting job result: Maxmium number of retries (%d) reached\r\n%s", + maxRetryCount, + paste0(results, "\r\n") + ) + ) + } else { + retryCounter <- retryCounter + 1 + } + + results <- rAzureBatch::downloadBlob( + jobId, + paste0("result/", jobId, "-merge-result.rds"), + downloadPath = tempFile, + overwrite = TRUE + ) + + if (is.vector(results)) { + results <- readRDS(tempFile) + return(results) + } + + # wait for 5 seconds for the result to be available + Sys.sleep(5) + } +} + +#' Delete a job +#' +#' @param jobId A job id +#' +#' @examples +#' \dontrun{ +#' deleteJob("job-001") +#' } +#' @export +deleteJob <- function(jobId) { + deleteStorageContainer(jobId) + + response <- rAzureBatch::deleteJob(jobId, content = "response") + + if (response$status_code == 202) { + cat(sprintf("Your job '%s' has been deleted.", jobId), + fill = TRUE) + } else if (response$status_code == 404) { + cat(sprintf("Job '%s' does not exist.", jobId), + fill = TRUE) + } +} + +#' Terminate a job +#' +#' @param jobId A job id +#' +#' @examples +#' \dontrun{ +#' terminateJob("job-001") +#' } +#' @export +terminateJob <- function(jobId) { + response <- rAzureBatch::terminateJob(jobId, content = "response") + + if (response$status_code == 202) { + cat(sprintf("Your job '%s' has been terminated.", jobId), + fill = TRUE) + } else if (response$status_code == 404) { + cat(sprintf("Job '%s' does not exist.", jobId), + fill = TRUE) + } else if (response$status_code == 409) { + cat(sprintf("Job '%s' has already completed.", jobId), + fill = TRUE) + } } #' Wait for current tasks to complete diff --git a/R/storage_management.R b/R/storage_management.R index dd8b7eec..ca82bf64 100644 --- a/R/storage_management.R +++ b/R/storage_management.R @@ -39,7 +39,10 @@ deleteStorageContainer <- function(container) { rAzureBatch::deleteContainer(container, content = "response") if (response$status_code == 202) { - cat(sprintf("Your container '%s' has been deleted.", container), + cat(sprintf("Your storage container '%s' has been deleted.", container), + fill = TRUE) + } else if (response$status_code == 404) { + cat(sprintf("storage container '%s' does not exist.", container), fill = TRUE) } diff --git a/R/utility.R b/R/utility.R index 82413f5f..6328407d 100644 --- a/R/utility.R +++ b/R/utility.R @@ -179,6 +179,67 @@ getXmlValues <- function(xmlResponse, xmlPath) { xml2::xml_text(xml2::xml_find_all(xmlResponse, xmlPath)) } +saveMetadataBlob <- function(jobId, metadata) { + xmlNode <- "" + if (length(metadata) > 0) { + for (i in 1:length(metadata)) { + xmlNode <- + paste0( + xmlNode, + sprintf( + "<%s>%s", + metadata[[i]]$name, + metadata[[i]]$value, + metadata[[i]]$name + ) + ) + } + } + xmlNode <- paste0(xmlNode, "") + saveXmlBlob(jobId, xmlNode, "metadata") +} + +saveXmlBlob <- function(jobId, xmlBlock, name) { + xmlFile <- paste0(jobId, "-", name, ".rds") + saveRDS(xmlBlock, file = xmlFile) + rAzureBatch::uploadBlob(jobId, paste0(getwd(), "/", xmlFile)) + file.remove(xmlFile) +} + +readMetadataBlob <- function(jobId) { + tempFile <- tempfile(paste0(jobId, "-metadata"), fileext = ".rds") + result <- rAzureBatch::downloadBlob( + jobId, + paste0(jobId, "-metadata.rds"), + downloadPath = tempFile, + overwrite = TRUE + ) + + if (is.vector(result)) { + result <- readRDS(tempFile) + result <- xml2::as_xml_document(result) + chunkSize <- getXmlValues(result, ".//chunkSize") + packages <- getXmlValues(result, ".//packages") + errorHandling <- getXmlValues(result, ".//errorHandling") + wait <- getXmlValues(result, ".//wait") + enableCloudCombine <- + getXmlValues(result, ".//enableCloudCombine") + + metadata <- + list( + chunkSize = chunkSize, + packages = packages, + errorHandling = errorHandling, + enableCloudCombine = enableCloudCombine, + wait = wait + ) + + return(metadata) + } else { + return(NULL) + } +} + areShallowEqual <- function(a, b) { !is.null(a) && !is.null(b) && a == b } diff --git a/docs/23-persistent-storage.md b/docs/23-persistent-storage.md index 1e21b428..8c4c60d6 100644 --- a/docs/23-persistent-storage.md +++ b/docs/23-persistent-storage.md @@ -22,7 +22,7 @@ By default, *wait* is set to TRUE. This blocks the R session. By setting *wait* ## Getting results from storage -When the user is ready to get their results in a new session, the user use the following command: +When the user is ready to get their results in a new session, the user uses the following command: ```R my_job_id <- "my_unique_job_id" diff --git a/docs/31-long-running-job.md b/docs/31-long-running-job.md index 58bd383d..f9a7ebe4 100644 --- a/docs/31-long-running-job.md +++ b/docs/31-long-running-job.md @@ -43,6 +43,8 @@ getJob returns job metadata, such as chunk size, whether cloud combine is enable succeeded: 0 failed: 5 total: 6 + + job state: completed ``` ## Get job list @@ -81,9 +83,14 @@ Once job is completed successfully, you can call getJobResult to retrieve the jo ### Clean up -Once you get the job result, you can delete the job. +Once you get the job result, you can delete the job and its result. +```R + deleteJob(jobId) +``` + +Please note deleteJob will delete the job at batch service and the storage container holding the job result. ```R - rAzureBatch::deleteJob(jobId) + deleteJob(jobId) ``` A [working sample](../samples/long_running_job/long_running_job.R) can be found in the samples directory. diff --git a/docs/40-troubleshooting.md b/docs/40-troubleshooting.md index e105d01c..bee49c3f 100644 --- a/docs/40-troubleshooting.md +++ b/docs/40-troubleshooting.md @@ -67,7 +67,18 @@ This issue is due to certain compiler flags not available in the default version Since doAzureParallel uses Microsoft R Open version 3.3 as the default version of R, it will automatically try to pull package from [MRAN](https://mran.microsoft.com/) rather than CRAN. This is a big benefit when wanting to use a constant version of a package but does not always contain references to the latest versions. To use a specific version from CRAN or a different MRAN snapshot date, use the [command line](./30-customize-cluster.md#running-commands-when-the-cluster-starts) in the cluster configuration to manually install the packages you need. ## Viewing files from Azure Storage -In every foreach run, the job will push its logs into Azure Storage that can be fetched by the user. For more information on reading log files, check out [managing storage](./41-managing-storage-via-R.md). +In every foreach run, the job will push its logs into Azure Storage that can be fetched by the user. For more information on reading log files, check out [managing storage](./41-managing-storage-via-R.md). + +By default, when wait is set to TRUE, job and its result is automatically deleted after the run is completed. To keep the job and its result for investigation purpose, you can set a global environment setting or specify an option in foreach loop to keep it. + +```R +# This will set a global setting to keep job and its result after run is completed. +setAutoDeleteJob(FALSE) + +# This will keep job and its result at each job level after run is completed. +options <- list(autoDeleteJob = FALSE) +foreach::foreach(i = 1:4, .options.azure = opt) %dopar% { ... } +``` ## Viewing files directly from compute node Cluster setup logs are not persisted. `getClusterFile` function will fetch any files including stdout and stderr log files in the cluster. This is particularly useful for users that utilizing [customize script](./30-customize-cluster.md) on their nodes and installing specific [packages](./20-package-management.md). diff --git a/docs/42-faq.md b/docs/42-faq.md index afbb63ea..71d2d162 100644 --- a/docs/42-faq.md +++ b/docs/42-faq.md @@ -19,4 +19,10 @@ Yes. The [command line](./30-customize-cluster.md#running-commands-when-the-clus No. doAzureParallel is built on top of the Linux CentOS distribution and will not work with Windows-specific packages. ## Why am I getting the error: could not find function "startsWith"? -doAzureParallel requires you to run R 3.3 or greater on you local machine. \ No newline at end of file +doAzureParallel requires you to run R 3.3 or greater on you local machine. + +## My job failed but I can't find my job and its result? +if you set wait = TRUE, job and its result is automatically deleted, to keep them for investigation purpose, you can set global option using setAutoDeleteJob(FALSE), or use autoDeleteJob option at foreach level. + +## How do I cancel a job? +You can call terminateJob(jobId) to cancel a job. diff --git a/man/deleteJob.Rd b/man/deleteJob.Rd new file mode 100644 index 00000000..4635a8e2 --- /dev/null +++ b/man/deleteJob.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/jobUtilities.R +\name{deleteJob} +\alias{deleteJob} +\title{Delete a job} +\usage{ +deleteJob(jobId) +} +\arguments{ +\item{jobId}{A job id} +} +\description{ +Delete a job +} +\examples{ +\dontrun{ +deleteJob("job-001") +} +} diff --git a/man/setAutoDeleteJob.Rd b/man/setAutoDeleteJob.Rd new file mode 100644 index 00000000..1aec0d62 --- /dev/null +++ b/man/setAutoDeleteJob.Rd @@ -0,0 +1,17 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/doAzureParallel.R +\name{setAutoDeleteJob} +\alias{setAutoDeleteJob} +\title{Specify whether to delete job and its result after asychronous job is completed.} +\usage{ +setAutoDeleteJob(value = TRUE) +} +\arguments{ +\item{value}{boolean of TRUE or FALSE} +} +\description{ +Specify whether to delete job and its result after asychronous job is completed. +} +\examples{ +setAutoDeleteJob(FALSE) +} diff --git a/man/terminateJob.Rd b/man/terminateJob.Rd new file mode 100644 index 00000000..8f6aaedd --- /dev/null +++ b/man/terminateJob.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/jobUtilities.R +\name{terminateJob} +\alias{terminateJob} +\title{Terminate a job} +\usage{ +terminateJob(jobId) +} +\arguments{ +\item{jobId}{A job id} +} +\description{ +Terminate a job +} +\examples{ +\dontrun{ +terminateJob("job-001") +} +} diff --git a/samples/long_running_job/long_running_job.R b/samples/long_running_job/long_running_job.R index e2002be8..c98b003d 100644 --- a/samples/long_running_job/long_running_job.R +++ b/samples/long_running_job/long_running_job.R @@ -65,4 +65,4 @@ jobResult <- getJobResult(jobId) doAzureParallel::stopCluster(cluster) # delete the job -rAzureBatch::deleteJob(jobId) +deleteJob(jobId) diff --git a/tests/testthat/test-autodeletejob.R b/tests/testthat/test-autodeletejob.R new file mode 100644 index 00000000..6b142c02 --- /dev/null +++ b/tests/testthat/test-autodeletejob.R @@ -0,0 +1,69 @@ +# Run this test for users to make sure the autodeletejob feature +# of doAzureParallel is still working +context("auto delete job scenario test") +test_that("auto delete job as foreach option test", { + testthat::skip("Live test") + testthat::skip_on_travis() + credentialsFileName <- "credentials.json" + clusterFileName <- "cluster.json" + + doAzureParallel::generateCredentialsConfig(credentialsFileName) + doAzureParallel::generateClusterConfig(clusterFileName) + + doAzureParallel::setCredentials(credentialsFileName) + cluster <- doAzureParallel::makeCluster(clusterFileName) + doAzureParallel::registerDoAzureParallel(cluster) + + # use autoDeleteJob flag to keep the job and its result + '%dopar%' <- foreach::'%dopar%' + res <- + foreach::foreach(i = 1:10, + .options.azure = list(autoDeleteJob = FALSE)) %dopar% { + i + } + + testthat::expect_equal(length(res), + 10) + + for (i in 1:10) { + testthat::expect_equal(res[[i]], + i) + } + + # find the job id from the output of above command and call + # deleteJob(jobId) when you no longer need the job and its result +}) + +test_that("auto delete job as global setting test", { + testthat::skip("Live test") + testthat::skip_on_travis() + credentialsFileName <- "credentials.json" + clusterFileName <- "cluster.json" + + doAzureParallel::generateCredentialsConfig(credentialsFileName) + doAzureParallel::generateClusterConfig(clusterFileName) + + doAzureParallel::setCredentials(credentialsFileName) + cluster <- doAzureParallel::makeCluster(clusterFileName) + doAzureParallel::registerDoAzureParallel(cluster) + + # set autoDeleteJob flag to FALSE to keep the job and its result + setAutoDeleteJob(FALSE) + + '%dopar%' <- foreach::'%dopar%' + res <- + foreach::foreach(i = 1:10) %dopar% { + i + } + + testthat::expect_equal(length(res), + 10) + + for (i in 1:10) { + testthat::expect_equal(res[[i]], + i) + } + + # find the job id from the output of above command and call + # deleteJob(jobId) when you no longer need the job and its result +}) diff --git a/tests/testthat/test-long-running-job.R b/tests/testthat/test-long-running-job.R index 7cd8edb9..461ddf2a 100644 --- a/tests/testthat/test-long-running-job.R +++ b/tests/testthat/test-long-running-job.R @@ -15,33 +15,34 @@ test_that("Long Running Job Test", { cluster <- doAzureParallel::makeCluster(clusterFileName) doAzureParallel::registerDoAzureParallel(cluster) - options <- list(wait = FALSE, job = 'myjob') + options <- list(wait = FALSE, + enableCloudCombine = TRUE) '%dopar%' <- foreach::'%dopar%' jobId <- foreach::foreach( i = 1:4, .packages = c('httr'), + .errorhandling = "remove", .options.azure = options ) %dopar% { mean(1:3) } - job <- getJob(jobId) + job <- doAzureParallel::getJob(jobId) # get active/running job list filter <- filter <- list() filter$state <- c("active", "completed") - getJobList(filter) + doAzureParallel::getJobList(filter) # get job list for all jobs - getJobList() + doAzureParallel::getJobList() # wait 2 minutes for job to finish Sys.sleep(120) # get job result - jobResult <- getJobResult(jobId) - + jobResult <- doAzureParallel::getJobResult(jobId) doAzureParallel::stopCluster(cluster) # verify the job result is correct @@ -51,6 +52,6 @@ test_that("Long Running Job Test", { testthat::expect_equal(jobResult, list(2, 2, 2, 2)) - # delete the job - rAzureBatch::deleteJob(jobId) + # delete the job and its result + doAzureParallel::deleteJob(jobId) })