Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-288] Add finer-grain dynamic partition generation for Salesf… #2140

Closed
wants to merge 2 commits into from

Conversation

htran1
Copy link
Contributor

@htran1 htran1 commented Oct 13, 2017

…orce

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Here are some details about my PR, including screenshots (if applicable):
    Added refinement of the daily bucket histogram by splitting large buckets into smaller time ranges until a target size is reached. This is to avoid timeout errors when a partition is too large.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:
    Updated partitioning test and ran Salesforce Contact flow.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@htran1
Copy link
Contributor Author

htran1 commented Oct 13, 2017

@zxcware please review.

/**
* Split a histogram bucket along the midpoint if it is larger than the bucket size limit.
*/
private int getHistogramRecursively(TableCountProbingContext probingContext, Histogram histogram, StrSubstitutor sub,
Copy link
Contributor

@zxcware zxcware Oct 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify the recursion as:

getHistogramRecursively(context, count, start, end, sub, values, outputHistogram) {
  check base case:
    outputHistogram.add(new HistogramGroup(...))
  int leftCount  = queryLeft
  getHistogramRecursively(context, leftCount, start, mid, sub, values, outputHistogram)
  getHistogramRecursively(context, count - leftCount, mid, end, sub, values, outputHistogram)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. The method was written to allow the caller to optionally provide the count, but since we can simplify if that flexibility is not allowed, I'll remove that option.


log.info("Refining histogram with bucket size limit {}.", bucketSizeLimit);

final Iterator<HistogramGroup> it = histogram.getGroups().iterator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify the logic by appending HisgrogramGroup(partition.highwatermark, 0)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to copy the list and add it item since I want to avoid changing the input histogram, but that still allows us to avoid the special handling of the last group.

while (elements.hasNext()) {
element = elements.next().getAsJsonObject();
histogram.add(new HistogramGroup(element.get("time").getAsString(), element.get("cnt").getAsInt()));
String time = element.get("time").getAsString() + ZERO_TIME_SUFFIX;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of the function itself, whether to append the ZERO_TIME_SUFFIX really depends on the element.get("time"). My concern is that the function won't be correct if time already has a proper suffix.

The solution would be:

  1. Leverage Utils.toDateTimeFormat, given the input time format
  2. Remove the function and put the logic to getHistogramByDayBucketing, for it's only used there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change the name to make it clearer that this is only for parsing results from the day bucketing.

String startTimeStr = Utils.dateToString(new Date(startTime), SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
String endTimeStr = Utils.dateToString(new Date(endTime), SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);

subValues.put("start", startTimeStr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's necessary to consider whether we should include the start by comparing it with the global partition, which can be put in the probingContext

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Distinguishing inclusive/exclusive is not required for correctness here and would complicate the logic for no real gain.

}

// exchange the first partition point with the global low watermark
partitionPoints.set(0, Long.toString(lowWatermark));
Copy link
Contributor

@zxcware zxcware Oct 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this, we can set the first HistogramGroup key to be lowWatermak, which also filters records out of scope in fine probing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll move it to between the first histogram generation where the watermark is lost and the refinement.

private static final String MIN_TARGET_PARTITION_SIZE = "salesforce.minTargetPartitionSize";
private static final int DEFAULT_MIN_TARGET_PARTITION_SIZE = 250000;
private static final String PROBE_TARGET_RATIO = "salesforce.probeTargetRatio";
private static final double DEFAULT_PROBE_TARGET_RATIO = 0.60;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great idea! We should document it so that readers can understand its meaning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

Copy link
Contributor

@zxcware zxcware left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@abti abti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@asfgit asfgit closed this in 626d312 Oct 14, 2017
zxliucmu pushed a commit to zxliucmu/incubator-gobblin that referenced this pull request Nov 16, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants