diff --git a/R/bulk-query.R b/R/bulk-query.R index 0ff5ea90..147f8065 100644 --- a/R/bulk-query.R +++ b/R/bulk-query.R @@ -193,7 +193,7 @@ sf_query_result_bulk_v1 <- function(job_id, content_type <- httr_response$headers$`content-type` if (grepl('xml', content_type)) { response_text <- content(httr_response, type="text/plain", encoding="UTF-8") - resultset <- as_tibble(xmlToList(response_text)) + resultset <- as_tibble(xmlToList(response_text), .name_repair = "unique") } else if(grepl('text/csv', content_type)) { response_text <- content(httr_response, type="text", encoding="UTF-8") if(response_text == "Records not found for this query"){ @@ -407,30 +407,33 @@ sf_query_bulk_v1 <- function(soql, message(paste0("Attempt #", z)) } Sys.sleep(interval_seconds) - batch_query_status <- sf_batch_status_bulk(job_id = batch_query_info$jobId, - batch_id = batch_query_info$id, - api_type = api_type, - verbose = verbose) - if(batch_query_status$state == 'Failed'){ - stop(batch_query_status$stateMessage) - } else if(batch_query_status$state == "Completed"){ + + # PKChunked queries create multiple batches (one per chunk) so need + # to check each of the statuses + job_batches <- sf_job_batches_bulk(batch_query_info$jobId, + api_type = api_type, + verbose = verbose) + # Successfully submitted PKChunked queries return one batch with the status + # "NotProcessed" which should be ignored + batch_query_statuses <- job_batches %>% + # ignore record ids that could not be matched + filter(if_all(any_of("state"), ~(.x != 'NotProcessed'))) + + if(all(batch_query_statuses$state == "Completed")){ status_complete <- TRUE - } else if(batch_query_status$state == "NotProcessed") { - # this means it was a successfully submitted PKChunked query, now check - # for the child batches of this job (i.e. all others related to the job - # that are not the initial batch query info record) - # check that all batches have been completed before declaring the job done - job_batches <- sf_job_batches_bulk(batch_query_status$jobId, - api_type = api_type, - verbose = verbose) - # remove the initial batch - batch_query_info <- job_batches %>% - # ignore record ids that could not be matched - filter(if_all(any_of("state"), ~(.x != 'NotProcessed'))) - - if(all(batch_query_info$state == "Completed")){ - status_complete <- TRUE + } else if(any(batch_query_statuses$state == 'Failed')){ + batch_error_str <- character(0) + for(i in 1:length(batch_query_statuses$state)){ + if(batch_query_statuses$state[i] == 'Failed'){ + this_error_str <- sprintf( + '[Batch %s] %s', + batch_query_statuses$id[i], + batch_query_statuses$stateMessage[i] + ) + batch_error_str <- c(batch_error_str, this_error_str) + } } + stop(paste(batch_error_str, collapse="\n ")) } else { # continue checking the status until done or max attempts z <- z + 1 @@ -442,13 +445,13 @@ sf_query_bulk_v1 <- function(soql, resultset <- sf_abort_job_bulk(job_info$id, api_type=api_type, verbose=verbose) } else { resultset <- list() - for(i in 1:nrow(batch_query_info)){ - batch_query_details <- sf_batch_details_bulk(job_id = batch_query_info$jobId[i], - batch_id = batch_query_info$id[i], + for(i in 1:nrow(batch_query_statuses)){ + batch_query_details <- sf_batch_details_bulk(job_id = batch_query_statuses$jobId[i], + batch_id = batch_query_statuses$id[i], api_type = api_type, verbose = verbose) - resultset[[i]] <- sf_query_result_bulk(job_id = batch_query_info$jobId[i], - batch_id = batch_query_info$id[i], + resultset[[i]] <- sf_query_result_bulk(job_id = batch_query_statuses$jobId[i], + batch_id = batch_query_statuses$id[i], result_id = batch_query_details$result, guess_types = guess_types, api_type = api_type, @@ -459,8 +462,8 @@ sf_query_bulk_v1 <- function(soql, api_type = api_type, verbose = verbose) } - # needed in case the result was PKChunked which returns the records in multiple - # batches that we must bind + # needed in case the result was PKChunked which returns + # the records in multiple batches that we must bind if(is.tbl(resultset)){ resultset <- resultset %>% sf_reorder_cols() %>%