Skip to content

Commit

Permalink
0003097: Common batches can get an error if they are in memory and two
Browse files Browse the repository at this point in the history
nodes try to pull with just the right timing.
  • Loading branch information
chenson42 committed May 8, 2017
1 parent dd02204 commit e1fcd11
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ protected FutureOutgoingBatch extractBatch(OutgoingBatch extractBatch, FutureExt
if (extractBatch.isExtractJobFlag() && extractBatch.getStatus() != Status.IG) {
if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) {
if (extractBatch.getStatus() != Status.RQ && extractBatch.getStatus() != Status.IG
&& !isPreviouslyExtracted(extractBatch)) {
&& !isPreviouslyExtracted(extractBatch, false)) {
/*
* the batch must have been purged. it needs to be
* re-extracted
Expand Down Expand Up @@ -846,7 +846,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe

if (currentBatch.getStatus() == Status.IG) {
cleanupIgnoredBatch(sourceNode, targetNode, currentBatch, writer);
} else if (!isPreviouslyExtracted(currentBatch)) {
} else if (!isPreviouslyExtracted(currentBatch, true)) {
String semaphoreKey = useStagingDataWriter ? Long.toString(currentBatch
.getBatchId()) : currentBatch.getNodeBatchId();
Semaphore lock = null;
Expand All @@ -864,7 +864,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
}
}

if (!isPreviouslyExtracted(currentBatch)) {
if (!isPreviouslyExtracted(currentBatch, true)) {
currentBatch.setExtractCount(currentBatch.getExtractCount() + 1);

if (currentBatch.getExtractStartTime() == null) {
Expand Down Expand Up @@ -978,7 +978,7 @@ protected IDataWriter wrapWithTransformWriter(Node sourceNode, Node targetNode,
transformExtractWriter = createTransformDataWriter(
sourceNode,
targetNode,
new ProcessInfoDataWriter(new StagingDataWriter(memoryThresholdInBytes, nodeService
new ProcessInfoDataWriter(new StagingDataWriter(memoryThresholdInBytes, true, nodeService
.findIdentityNodeId(), Constants.STAGING_CATEGORY_OUTGOING,
stagingManager), processInfo));
} else {
Expand Down Expand Up @@ -1015,14 +1015,17 @@ protected IStagedResource getStagedResource(OutgoingBatch currentBatch) {
currentBatch.getStagedLocation(), currentBatch.getBatchId());
}

protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch) {
protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch, boolean acquireReference) {
IStagedResource previouslyExtracted = getStagedResource(currentBatch);
if (previouslyExtracted != null && previouslyExtracted.exists()
&& previouslyExtracted.getState() != State.CREATE) {
if (log.isDebugEnabled()) {
log.debug("We have already extracted batch {}. Using the existing extraction: {}",
currentBatch.getBatchId(), previouslyExtracted);
}
if (acquireReference) {
previouslyExtracted.reference();
}
return true;
} else {
return false;
Expand Down Expand Up @@ -1201,7 +1204,8 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi
throw new RuntimeException(t);
} finally {
stagedResource.close();
if (!stagedResource.isFileResource()) {
stagedResource.dereference();
if (!stagedResource.isFileResource() && !stagedResource.isInUse()) {
stagedResource.delete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void open(DataContext context) {
}

protected IDataWriter buildWriter(long memoryThresholdInBytes) {
return new StagingDataWriter(memoryThresholdInBytes, sourceNodeId,
return new StagingDataWriter(memoryThresholdInBytes, false, sourceNodeId,
Constants.STAGING_CATEGORY_OUTGOING, stagingManager,
(IProtocolDataWriterListener[]) null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ public class StagingDataWriter extends AbstractProtocolDataWriter {
private Map<Batch, IStagedResource> stagedResources = new HashMap<Batch, IStagedResource>();

private long memoryThresholdInBytes;

private boolean acquireReference = false;

public StagingDataWriter(long memoryThresholdInBytes, String sourceNodeId, String category, IStagingManager stagingManager,
public StagingDataWriter(long memoryThresholdInBytes, boolean acquireReference, String sourceNodeId, String category, IStagingManager stagingManager,
IProtocolDataWriterListener... listeners) {
this(sourceNodeId, category, stagingManager, toList(listeners));
this.memoryThresholdInBytes = memoryThresholdInBytes;
this.acquireReference = acquireReference;
}

public StagingDataWriter(String sourceNodeId, String category, IStagingManager stagingManager,
Expand Down Expand Up @@ -84,6 +87,9 @@ protected IStagedResource getStagedResource(Batch batch) {
if (resource == null || resource.getState() == State.DONE) {
log.debug("Creating staged resource for batch {}", batch.getNodeBatchId());
resource = stagingManager.create(category, location, batch.getBatchId());
if (acquireReference) {
resource.reference();
}
}
stagedResources.put(batch, resource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ public String getExtensionName() {

public boolean isInUse();

public void dereference();

public void reference();

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
public class StagedResource implements IStagedResource {

static final Logger log = LoggerFactory.getLogger(StagedResource.class);

private int references = 0;

private File directory;

Expand Down Expand Up @@ -95,8 +97,18 @@ protected static String toPath(File directory, File file) {
return path;
}

@Override
public void reference() {
references++;
}

@Override
public void dereference() {
references--;
}

public boolean isInUse() {
return (readers != null && readers.size() > 0) || writer != null ||
return references > 0 || (readers != null && readers.size() > 0) || writer != null ||
(inputStreams != null && inputStreams.size() > 0) ||
outputStream != null;
}
Expand Down Expand Up @@ -210,7 +222,7 @@ public void close() {
closeInternal();
if (isFileResource()) {
stagingManager.inUse.remove(path);
}
}
}

private void closeInternal() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public long clean(long ttlInMs) {
*/
public IStagedResource create(Object... path) {
String filePath = buildFilePath(path);
StagedResource resource = new StagedResource(directory, filePath,
IStagedResource resource = new StagedResource(directory, filePath,
this);
if (resource.exists()) {
resource.delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.jumpmind.symmetric.io.data.DataProcessor;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.StagedResource;
import org.jumpmind.symmetric.io.stage.StagingManager;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -81,14 +80,14 @@ public void readThenWrite(long threshold) throws Exception {

StagingManager stagingManager = new StagingManager(DIR.getAbsolutePath());
ProtocolDataReader reader = new ProtocolDataReader(BatchType.LOAD, "test", origCsv);
StagingDataWriter writer = new StagingDataWriter(threshold, "aaa", "test", stagingManager, new BatchListener());
StagingDataWriter writer = new StagingDataWriter(threshold, false, "aaa", "test", stagingManager, new BatchListener());
DataProcessor processor = new DataProcessor(reader, writer, "test");
processor.process(new DataContext());

assertEquals(1, batchesWritten.size());
assertEquals(convertEol(origCsv), convertEol(batchesWritten.get(0)));

StagedResource resource = (StagedResource) stagingManager.find("test", "aaa", 1);
IStagedResource resource = (IStagedResource) stagingManager.find("test", "aaa", 1);
assertNotNull(resource);
if (threshold > origCsv.length()) {
assertFalse(resource.getFile().exists());
Expand Down

0 comments on commit e1fcd11

Please sign in to comment.