Skip to content

Commit

Permalink
Don't re-extract if a network transfer failed.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Apr 28, 2011
1 parent 92f24a5 commit 679e360
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 17 deletions.
Expand Up @@ -42,17 +42,15 @@
*/
public class ThresholdFileWriter extends Writer {

File file;
private File file;

String tempFileCategory;
private String tempFileCategory;

BufferedWriter fileWriter;

BufferedReader fileReader;
private BufferedWriter fileWriter;

StringBuilder buffer;
private StringBuilder buffer;

long threshhold;
private long threshhold;

/**
* @param threshold The number of bytes at which to start writing to a file
Expand All @@ -74,6 +72,14 @@ public ThresholdFileWriter(long threshold, String tempFileCategory) {
this.buffer = new StringBuilder();
this.threshhold = threshold;
}

public File getFile() {
return file;
}

public void setFile(File file) {
this.file = file;
}

@Override
public void close() throws IOException {
Expand Down Expand Up @@ -115,10 +121,17 @@ public BufferedReader getReader() throws IOException {
}
}

public void reset() {
this.file = null;
this.fileWriter = null;
buffer.setLength(0);
}

public void delete() {
if (file != null && file.exists()) {
file.delete();
}
file = null;
buffer.setLength(0);
}

Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
Expand All @@ -30,8 +31,10 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -110,7 +113,9 @@ public class DataExtractorService extends AbstractService implements IDataExtrac

private List<IExtractorFilter> extractorFilters;

private IStatisticManager statisticManager;
private IStatisticManager statisticManager;

private Map<Long, File> extractedBatchesHandle = new HashMap<Long, File>();

/**
* @see DataExtractorService#extractConfigurationStandalone(Node,
Expand Down Expand Up @@ -446,19 +451,31 @@ public List<OutgoingBatch> extract(Node node, IOutgoingTransport targetTransport
for (OutgoingBatch outgoingBatch : activeBatches) {
try {
currentBatch = outgoingBatch;
outgoingBatch.setStatus(OutgoingBatch.Status.QY);
outgoingBatch.setExtractCount(outgoingBatch.getExtractCount() + 1);
outgoingBatchService.updateOutgoingBatch(outgoingBatch);

databaseExtract(node, outgoingBatch, handler);


fileWriter.reset();

File previouslyExtracted = extractedBatchesHandle.get(currentBatch.getBatchId());
if (previouslyExtracted == null) {
outgoingBatch.setStatus(OutgoingBatch.Status.QY);
outgoingBatch.setExtractCount(outgoingBatch.getExtractCount() + 1);
outgoingBatchService.updateOutgoingBatch(outgoingBatch);
databaseExtract(node, outgoingBatch, handler);
} else {
log.info("DataExtractorUsingAlreadyExtractedBatch", currentBatch.getBatchId());
fileWriter.setFile(previouslyExtracted);
}
outgoingBatch.setStatus(OutgoingBatch.Status.SE);
outgoingBatch.setSentCount(outgoingBatch.getSentCount() + 1);
outgoingBatchService.updateOutgoingBatch(outgoingBatch);

fileWriter.close();
File file = fileWriter.getFile();
if (file != null) {
extractedBatchesHandle.put(currentBatch.getBatchId(), file);
}

fileWriter.close();

networkTransfer(fileWriter.getReader(), networkWriter);
fileWriter.delete();

outgoingBatch.setStatus(OutgoingBatch.Status.LD);
outgoingBatch.setLoadCount(outgoingBatch.getLoadCount() + 1);
Expand All @@ -472,11 +489,14 @@ public List<OutgoingBatch> extract(Node node, IOutgoingTransport targetTransport
batchesSentCount, bytesSentCount);
break;
}

extractedBatchesHandle.remove(currentBatch.getBatchId());
fileWriter.delete();
} finally {
// It doesn't hurt anything to call close and delete
// a second time
fileWriter.close();
fileWriter.delete();

}
}
} catch (Exception e) {
Expand Down
Expand Up @@ -89,6 +89,7 @@ DataPushingFailed=There was an error while pushing data to the server
DataPushingFailedLock=Did not run the push process because the cluster service has it locked.
DataPushingFailedNoNodeIdentity=Did not run the push process because the the node has no identify and needs to be registered.
DataExtractorCouldNotFindStreamCommand=Could not find the stream command for event type of %s.
DataExtractorUsingAlreadyExtractedBatch=We have already extracted batch %d. Using the existing extraction. To force re-extraction, please restart this instance of SymmetricDS.
DataExtractorMissingRowData=The row data was blank for the data row with a data id of %d.
DataExtractorMissingPkData=The primary key data was blank for the data row with a data id of %d.
DataExtractorReachedMaxNumberOfBytesToSync=Interrupted extraction after %d batches and %d bytes. We hit the configured byte threshold to send in one synchronization. Data will continue to be synchronized on the next pull.
Expand Down

0 comments on commit 679e360

Please sign in to comment.