Skip to content

Commit

Permalink
Fix Issue with PKChunking
Browse files Browse the repository at this point in the history
Attempt to fix the error caused by having more than one query batch created for ones using the PKChunking option (#124)

Also, this commit includes changes to as_tibble() that make the name repair argument explicit.
  • Loading branch information
StevenMMortimer committed Sep 17, 2022
1 parent b296b30 commit cb6784f
Showing 1 changed file with 33 additions and 30 deletions.
63 changes: 33 additions & 30 deletions R/bulk-query.R
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ sf_query_result_bulk_v1 <- function(job_id,
content_type <- httr_response$headers$`content-type`
if (grepl('xml', content_type)) {
response_text <- content(httr_response, type="text/plain", encoding="UTF-8")
resultset <- as_tibble(xmlToList(response_text))
resultset <- as_tibble(xmlToList(response_text), .name_repair = "unique")
} else if(grepl('text/csv', content_type)) {
response_text <- content(httr_response, type="text", encoding="UTF-8")
if(response_text == "Records not found for this query"){
Expand Down Expand Up @@ -407,30 +407,33 @@ sf_query_bulk_v1 <- function(soql,
message(paste0("Attempt #", z))
}
Sys.sleep(interval_seconds)
batch_query_status <- sf_batch_status_bulk(job_id = batch_query_info$jobId,
batch_id = batch_query_info$id,
api_type = api_type,
verbose = verbose)
if(batch_query_status$state == 'Failed'){
stop(batch_query_status$stateMessage)
} else if(batch_query_status$state == "Completed"){

# PKChunked queries create multiple batches (one per chunk) so need
# to check each of the statuses
job_batches <- sf_job_batches_bulk(batch_query_info$jobId,
api_type = api_type,
verbose = verbose)
# Successfully submitted PKChunked queries return one batch with the status
# "NotProcessed" which should be ignored
batch_query_statuses <- job_batches %>%
# ignore record ids that could not be matched
filter(if_all(any_of("state"), ~(.x != 'NotProcessed')))

if(all(batch_query_statuses$state == "Completed")){
status_complete <- TRUE
} else if(batch_query_status$state == "NotProcessed") {
# this means it was a successfully submitted PKChunked query, now check
# for the child batches of this job (i.e. all others related to the job
# that are not the initial batch query info record)
# check that all batches have been completed before declaring the job done
job_batches <- sf_job_batches_bulk(batch_query_status$jobId,
api_type = api_type,
verbose = verbose)
# remove the initial batch
batch_query_info <- job_batches %>%
# ignore record ids that could not be matched
filter(if_all(any_of("state"), ~(.x != 'NotProcessed')))

if(all(batch_query_info$state == "Completed")){
status_complete <- TRUE
} else if(any(batch_query_statuses$state == 'Failed')){
batch_error_str <- character(0)
for(i in 1:length(batch_query_statuses$state)){
if(batch_query_statuses$state[i] == 'Failed'){
this_error_str <- sprintf(
'[Batch %s] %s',
batch_query_statuses$id[i],
batch_query_statuses$stateMessage[i]
)
batch_error_str <- c(batch_error_str, this_error_str)
}
}
stop(paste(batch_error_str, collapse="\n "))
} else {
# continue checking the status until done or max attempts
z <- z + 1
Expand All @@ -442,13 +445,13 @@ sf_query_bulk_v1 <- function(soql,
resultset <- sf_abort_job_bulk(job_info$id, api_type=api_type, verbose=verbose)
} else {
resultset <- list()
for(i in 1:nrow(batch_query_info)){
batch_query_details <- sf_batch_details_bulk(job_id = batch_query_info$jobId[i],
batch_id = batch_query_info$id[i],
for(i in 1:nrow(batch_query_statuses)){
batch_query_details <- sf_batch_details_bulk(job_id = batch_query_statuses$jobId[i],
batch_id = batch_query_statuses$id[i],
api_type = api_type,
verbose = verbose)
resultset[[i]] <- sf_query_result_bulk(job_id = batch_query_info$jobId[i],
batch_id = batch_query_info$id[i],
resultset[[i]] <- sf_query_result_bulk(job_id = batch_query_statuses$jobId[i],
batch_id = batch_query_statuses$id[i],
result_id = batch_query_details$result,
guess_types = guess_types,
api_type = api_type,
Expand All @@ -459,8 +462,8 @@ sf_query_bulk_v1 <- function(soql,
api_type = api_type,
verbose = verbose)
}
# needed in case the result was PKChunked which returns the records in multiple
# batches that we must bind
# needed in case the result was PKChunked which returns
# the records in multiple batches that we must bind
if(is.tbl(resultset)){
resultset <- resultset %>%
sf_reorder_cols() %>%
Expand Down

0 comments on commit cb6784f

Please sign in to comment.