Skip to content

Commit

Permalink
0004095: Unrouted count should check current sequence instead of running
Browse files Browse the repository at this point in the history
max(data_id)
  • Loading branch information
erilong committed Sep 23, 2019
1 parent bef58b2 commit 51f0b5a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
Expand Up @@ -260,6 +260,14 @@ public List<DataGap> getDataGaps() {
return dataService.findDataGaps();
}

public DataGap getLastDataGap() {
List<DataGap> gaps = getDataGaps();
if (gaps.size() > 0) {
return gaps.get(gaps.size() - 1);
}
return null;
}

public void addDataIds(List<Long> dataIds) {
}

Expand Down
Expand Up @@ -67,6 +67,8 @@ public class DataGapFastDetector extends DataGapDetector implements ISqlRowMappe
protected IContextService contextService;

protected List<DataGap> gaps;

protected DataGap lastGap;

protected List<Long> dataIds;

Expand Down Expand Up @@ -291,6 +293,9 @@ public void afterRouting() {
}

printStats = saveDataGaps(ts, printStats);
if (gaps.size() > 0) {
lastGap = gaps.get(gaps.size() - 1);
}

setFullGapAnalysis(false);
if (isBusyExpire) {
Expand Down Expand Up @@ -604,17 +609,25 @@ public Long mapRow(Row row) {
return row.getLong("data_id");
}

@Override
public List<DataGap> getDataGaps() {
return gaps;
}

@Override
public DataGap getLastDataGap() {
return lastGap;
}

@Override
public void addDataIds(List<Long> dataIds) {
this.dataIds.addAll(dataIds);
}

/**
* This method is called for each channel that is routed. Once it is set for a routing pass it should remain set until the routing pass is done.
*/
@Override
public void setIsAllDataRead(boolean isAllDataRead) {
this.isAllDataRead &= isAllDataRead;
}
Expand All @@ -626,6 +639,7 @@ public boolean isFullGapAnalysis() {
return isFullGapAnalysis;
}

@Override
public void setFullGapAnalysis(boolean isFullGapAnalysis) {
setFullGapAnalysis(null, isFullGapAnalysis);
}
Expand Down
Expand Up @@ -1292,9 +1292,16 @@ protected List<TriggerRouter> getTriggerRoutersForData(Data data) {
}

public long getUnroutedDataCount() {
long maxDataIdAlreadyRouted = sqlTemplateDirty
.queryForLong(getSql("selectLastDataIdRoutedUsingDataGapSql"));
long leftToRoute = engine.getDataService().findMaxDataId() - maxDataIdAlreadyRouted;
long maxDataIdAlreadyRouted = 0;
if (parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) {
maxDataIdAlreadyRouted = sqlTemplateDirty.queryForLong(getSql("selectLastDataIdRoutedUsingDataGapSql"));
} else {
DataGap lastGap = gapDetector.getLastDataGap();
if (lastGap != null) {
maxDataIdAlreadyRouted = lastGap.getStartId();
}
}
long leftToRoute = (engine.getDataService().findMaxDataId() - maxDataIdAlreadyRouted) + 1;

if (leftToRoute > 0) {
return leftToRoute;
Expand Down

0 comments on commit 51f0b5a

Please sign in to comment.