Skip to content

Commit

Permalink
[GOBBLIN-1568] Exponential backoff for Salesforce bulk api polling (#…
Browse files Browse the repository at this point in the history
…3420)

* Exponential backoff for Salesforce bulk api polling

* Read min and max wait time from prop with default
  • Loading branch information
williamwjs committed Oct 28, 2021
1 parent 94a777a commit 7fe8f97
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
Expand Up @@ -318,6 +318,12 @@ public class ConfigurationKeys {
public static final boolean DEFAULT_EXTRACT_LIMIT_ENABLED = false;
public static final String EXTRACT_ID_TIME_ZONE = "extract.extractIdTimeZone";
public static final String DEFAULT_EXTRACT_ID_TIME_ZONE = "UTC";
public static final String EXTRACT_SALESFORCE_BULK_API_MIN_WAIT_TIME_IN_MILLIS_KEY =
"extract.salesforce.bulkApi.minWaitTimeInMillis";
public static final long DEFAULT_EXTRACT_SALESFORCE_BULK_API_MIN_WAIT_TIME_IN_MILLIS = 60 * 1000L; // 1 min
public static final String EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS_KEY =
"extract.salesforce.bulkApi.maxWaitTimeInMillis";
public static final long DEFAULT_EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS = 10 * 60 * 1000L; // 10 min

/**
* Converter configuration properties.
Expand Down
Expand Up @@ -835,17 +835,25 @@ private List<BatchIdAndResultId> getQueryResultIds(String entity, List<Predicate

BatchInfo bulkBatchInfo = this.bulkConnection.createBatchFromStream(this.bulkJob, bout);

int waitMilliSeconds = 60 * 1000; // wait 1 minute

// Get batch info with complete resultset (info id - refers to the resultset id corresponding to entire resultset)
bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId());

// wait for completion, failure, or formation of PK chunking batches
// if it is InProgress or Queued, continue to wait.
int count = 0;
long minWaitTimeInMilliSeconds = super.workUnitState.getPropAsLong(
ConfigurationKeys.EXTRACT_SALESFORCE_BULK_API_MIN_WAIT_TIME_IN_MILLIS_KEY,
ConfigurationKeys.DEFAULT_EXTRACT_SALESFORCE_BULK_API_MIN_WAIT_TIME_IN_MILLIS);
long maxWaitTimeInMilliSeconds = super.workUnitState.getPropAsLong(
ConfigurationKeys.EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS_KEY,
ConfigurationKeys.DEFAULT_EXTRACT_SALESFORCE_BULK_API_MAX_WAIT_TIME_IN_MILLIS);
while (bulkBatchInfo.getState() == BatchStateEnum.InProgress || bulkBatchInfo.getState() == BatchStateEnum.Queued) {
log.info("Waiting for bulk resultSetIds");
// Exponential backoff
long waitMilliSeconds = Math.min((long) (Math.pow(2, count) * minWaitTimeInMilliSeconds), maxWaitTimeInMilliSeconds);
Thread.sleep(waitMilliSeconds);
bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId());
log.info("Waiting for bulk resultSetIds");
count++;
}

// Wait for pk chunking batches
Expand Down

0 comments on commit 7fe8f97

Please sign in to comment.