-
Notifications
You must be signed in to change notification settings - Fork 743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-423] Add record count limit to salesforce source #2300
Conversation
// The upper bounds can be removed for last work unit | ||
partitions.add(new Partition(lowWatermark, highWatermark, true, false)); | ||
} else { | ||
// The upper bounds can not be removed for last work unit | ||
// The upper bounds can not be removed for last work unit (even for bootstrapping mode, we want to fix the upperbound so next time we would know if we have catched up) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
catched -> caught
} else { | ||
// The upper bounds can not be removed for last work unit | ||
|
||
if (isBootStrapWithFixedHighWatermark()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be merged with the last else case. So if (!isBootStrapWithFixedHighWatermark() && (isFullDump() || isSnapshot(extractType)))
@@ -322,6 +333,17 @@ protected long getLowWatermark(ExtractType extractType, WatermarkType watermarkT | |||
lowWatermark = this.getAppendLowWatermark(watermarkType, previousWatermark, deltaForNextWatermark); | |||
} | |||
} | |||
|
|||
if (isBootStrapWithFixedHighWatermark()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks the same as the else block above, so it can be merged by changing the if to if (!isBootStrapWithFixedHighWatermark() && (this.isFullDump() || this.isWatermarkOverride())) {
highWatermark = this.state.getPropAsLong(ConfigurationKeys.SOURCE_QUERYBASED_END_VALUE); | ||
LOG.info("Bootstrapping mode: persisted high watermark " + highWatermark); | ||
} else { | ||
this.state.setProp(ConfigurationKeys.SOURCE_QUERYBASED_END_VALUE, highWatermark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this state getting persisted and propagated to the next run? If it does then what happens when user changes the configuration? Will the config change override the state?
@@ -79,6 +79,11 @@ | |||
public static final boolean DEFAULT_USE_ALL_OBJECTS = false; | |||
|
|||
private static final String ENABLE_DYNAMIC_PARTITIONING = "salesforce.enableDynamicPartitioning"; | |||
private static final String DYNAMIC_PROBING_TOTAL_RECORDS_LIMIT = "salesforce.dynamicProbing.total.records.limit"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Camel case names should be used here.
break; | ||
} | ||
histogramActual.add(group); | ||
if (histogramActual.getGroups().size() >= state.getPropAsLong(DYNAMIC_PROBING_TOTAL_BUCKETS_LIMIT, DYNAMIC_PROBING_DEFAULT_TOTAL_BUCKETS_LIMIT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why have the bucket limit here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@htran1 do we have total query limits? I think each bucket is a single query, if we have total limits for querying count, we can set this bucket number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query limit is enforced by salesforce.dynamicProbingLimit
. The loop here doesn't execute any queries, so the limit is not required here.
@@ -131,7 +136,19 @@ | |||
Partition partition = new Partitioner(state).getGlobalPartition(previousWatermark); | |||
Histogram histogram = getHistogram(sourceEntity.getSourceEntityName(), watermarkColumn, state, partition); | |||
|
|||
String specifiedPartitions = generateSpecifiedPartitions(histogram, minTargetPartitionSize, maxPartitions, | |||
// we should look if the count is too big, cut off early if count exceeds the limit, or bucket size is too large | |||
Histogram histogramActual = new Histogram(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider pushing this logic into getHistogram() for early termination of the search or add a note here indicating possible future work.
break; | ||
} | ||
histogramActual.add(group); | ||
if (histogramActual.getGroups().size() >= state.getPropAsLong(DYNAMIC_PROBING_TOTAL_BUCKETS_LIMIT, DYNAMIC_PROBING_DEFAULT_TOTAL_BUCKETS_LIMIT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query limit is enforced by salesforce.dynamicProbingLimit
. The loop here doesn't execute any queries, so the limit is not required here.
@@ -504,6 +534,14 @@ void add(Histogram histogram) { | |||
totalRecordCount += histogram.totalRecordCount; | |||
} | |||
|
|||
HistogramGroup get(int idx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this required? I don't see anywhere where the null result is checked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use this to get the last group.
public Void call() throws JobException { | ||
try { | ||
while (true) { | ||
currentJobLauncher = buildJobLauncher(jobProps);; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra semi-colon.
@@ -281,18 +271,104 @@ protected void startServices() throws Exception { | |||
@Override | |||
public void runJob(Properties jobProps, JobListener jobListener) throws JobException { | |||
try { | |||
JobLauncher jobLauncher = buildGobblinHelixJobLauncher(jobProps); | |||
runJob(jobProps, jobListener, jobLauncher); | |||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create and call the RetriggeringJobCallable call here.
@@ -139,4 +139,8 @@ | |||
public Extract createExtract(TableType type, String namespace, String table) { | |||
return this.extractFactory.getUniqueExtract(type, namespace, table); | |||
} | |||
|
|||
public boolean isRetriggerRequired() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @OverRide.
histogramAdjust = new Histogram(); | ||
for (HistogramGroup group : histogram.getGroups()) { | ||
if (histogramAdjust.getTotalRecordCount() + group.count > state | ||
.getPropAsLong(DYNAMIC_PROBING_TOTAL_RECORDS_LIMIT, DYNAMIC_PROBING_DEFAULT_TOTAL_RECORDS_LIMIT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need a different name here related to early stop.
List<Partition> partitions = new ArrayList<>(); | ||
|
||
List<String> watermarkPoints = state.getPropAsList(USER_SPECIFIED_PARTITIONS); | ||
boolean isEarlyStop = state.getPropAsBoolean(IS_EARLY_STOP); | ||
|
||
if (isEarlyStop && isEarlyStopEnabled() && isFullDump()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always raise the configuration error and do it earlier.
if (isFullDump() || isSnapshot(extractType)) { | ||
|
||
// If it is early stop, we should not remove upper bounds | ||
if ((isFullDump() || isSnapshot(extractType)) && !(isEarlyStop && isEarlyStopEnabled())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only check isEarlyStop
since it will always be false when isEarlyStopEnabled()
is false.
*/ | ||
@VisibleForTesting | ||
protected long getLowWatermark(ExtractType extractType, WatermarkType watermarkType, long previousWatermark, | ||
int deltaForNextWatermark) { | ||
long lowWatermark = ConfigurationKeys.DEFAULT_WATERMARK_VALUE; | ||
if (this.isFullDump() || this.isWatermarkOverride()) { | ||
|
||
if ((this.isFullDump() || this.isWatermarkOverride()) && !this.isEarlyStopEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove isEarlyStopEnabled()
and disallow this config for full dump and watermark override.
@@ -887,6 +887,10 @@ private void cleanupStagingData(JobState jobState) | |||
} | |||
} | |||
|
|||
public boolean isRetriggerRequired() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should expose only the early stop state and have the retrigger decision made in the scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Closes apache#2300 from yukuai518/limit
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Set fixed upper bound in bootstrapping mode. Don't move the upper bound in the future run.
Tests
Commits