Skip to content

Commit

Permalink
0004545: Data Extractor gets "java.lang.IllegalStateException: There is
Browse files Browse the repository at this point in the history
no content to read" from staging
  • Loading branch information
erilong committed Sep 14, 2020
1 parent 1e3d8dc commit f84f66d
Showing 1 changed file with 9 additions and 7 deletions.
Expand Up @@ -52,6 +52,7 @@ public long clean(long ttlInMs) {
// during setup or un-install, it's possible sym_lock table isn't available yet
}
try {
long startTime = System.currentTimeMillis();
boolean purgeBasedOnTTL = engine.getParameterService().is(ParameterConstants.STREAM_TO_FILE_PURGE_ON_TTL_ENABLED, false);
boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled();
long minTtlInMs = engine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS,600000);
Expand All @@ -60,6 +61,7 @@ public long clean(long ttlInMs) {
Map<String, Long> biggestIncomingByNode = getBiggestBatchIds(incomingBatches);

StagingPurgeContext context = new StagingPurgeContext();
context.putContextValue("startTime", startTime);
context.putContextValue("purgeBasedOnTTL", purgeBasedOnTTL);
context.putContextValue("recordIncomingBatchesEnabled", recordIncomingBatchesEnabled);
context.putContextValue("minTtlInMs", minTtlInMs);
Expand Down Expand Up @@ -87,10 +89,11 @@ protected boolean shouldCleanPath(IStagedResource resource, long ttlInMs, Stagin
String[] path = resource.getPath().split("/");

boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs;
boolean resourceClearsMinTimeHurdle = (System.currentTimeMillis() - resource.getLastUpdateTime()) > context.getLong("minTtlInMs");
boolean resourceClearsMinTimeHurdle = resource.getLastUpdateTime() < context.getLong("startTime")
&& (System.currentTimeMillis() - resource.getLastUpdateTime()) > context.getLong("minTtlInMs");

if (path[0].equals(STAGING_CATEGORY_OUTGOING)) {
return shouldCleanOutgoingPath(resource, ttlInMs, context, path, resourceIsOld);
return shouldCleanOutgoingPath(resource, ttlInMs, context, path, resourceIsOld, resourceClearsMinTimeHurdle);
} else if (path[0].equals(STAGING_CATEGORY_INCOMING)) {
return shouldCleanIncomingPath(resource, ttlInMs, context, path, resourceIsOld, resourceClearsMinTimeHurdle);
} else if (path[0].equals(STAGING_CATEGORY_LOG_MINER)) {
Expand All @@ -102,12 +105,12 @@ protected boolean shouldCleanPath(IStagedResource resource, long ttlInMs, Stagin
}

protected boolean shouldCleanOutgoingPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path,
boolean resourceIsOld) {
boolean resourceIsOld, boolean resourceClearsMinTimeHurdle) {
@SuppressWarnings("unchecked")
Set<Long> outgoingBatches = (Set<Long>) context.getContextValue("outgoingBatches");
try {
Long batchId = Long.valueOf(path[path.length - 1]);
if (!outgoingBatches.contains(batchId) || ttlInMs == 0) {
if ((resourceClearsMinTimeHurdle && !outgoingBatches.contains(batchId)) || ttlInMs == 0) {
return true;
}
} catch (NumberFormatException e) {
Expand All @@ -129,9 +132,8 @@ protected boolean shouldCleanIncomingPath(IStagedResource resource, long ttlInMs
try {
BatchId batchId = new BatchId(Long.valueOf(path[path.length - 1]), path[1]);
Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId());
if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId) &&
biggestBatchId != null && biggestBatchId > batchId.getBatchId() &&
resourceClearsMinTimeHurdle)
if ((recordIncomingBatchesEnabled && resourceClearsMinTimeHurdle && biggestBatchId != null
&& biggestBatchId > batchId.getBatchId() && !incomingBatches.contains(batchId))
|| (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) {
return true;
}
Expand Down

0 comments on commit f84f66d

Please sign in to comment.