Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-284] Add retry in SalesforceExtractor to handle transient ne… #2137

Closed
wants to merge 3 commits into from

Conversation

htran1
Copy link
Contributor

@htran1 htran1 commented Oct 11, 2017

…twork errors

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
  • Add retry to handle transient network errors. The BufferedReader is recreated to reconnect on before reattempting fetch. The Salesforce http connection is sometimes dropped from the server side, so this retry is required to avoid job failure.
  • Skip empty result sets since that can cause the job to terminate before pulling all data since returning a null record terminates the pull.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    Tested with a read pull of the Contact table.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@htran1
Copy link
Contributor Author

htran1 commented Oct 11, 2017

@zxcware please review.

this.bulkRecordCount++;

// Insert records in record set until it reaches the batch size
if (recordCount >= batchSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while loop already has a check, is it necessary to have this extra check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the existing logic moved into a new method. The check here is required to chunk the stream into batches since the while loop is only checking for end of stream. This check could be moved into the while, but I think it is here for the extra logging when the condition is true.

// skip header
reader.nextRecord();

int recordsToSkip = this.bulkRecordCount - this.prevBulkRecordCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a batch is processed one after the other, it's not necessary to skip but clearing the result set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there are two reasons for this.

  1. Clearing and recreating records would be more expensive than skipping.
  2. The reset moves to the beginning of the result set. There can be many 2K record batches in a result set and all except the last batch have been processed and given out of the extractor, so we need to skip over at least all batches prior to the last one.

if (!this.isBulkJobFinished()) {
rs = getBulkData();
}
// Skip empty result sets since they will cause the extractor to terminate early
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to do the skip loop in getBulkData

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getBulkData has a couple places where it returns, so I thought that this was cleaner and less error prone.

if (!this.isBulkJobFinished()) {
rs = getBulkData();
}
} while (rs != null && rs.isEmpty() && !this.isBulkJobFinished());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while ((rs == null || rs.isEmpty()) && !this.isBulkJobFinished()) {
  rs = getBulkData();
}

}

/**
* Fetch a batch of records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to make it clearer that this batch isn't the chunk batch, but the extractor is grouping the records

Copy link
Contributor

@zxcware zxcware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@abti abti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@asfgit asfgit closed this in c5e83a3 Oct 11, 2017
zxliucmu pushed a commit to zxliucmu/incubator-gobblin that referenced this pull request Nov 16, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants