Skip to content

Commit

Permalink
0003142: Sync Columns Between Incoming and Outgoing Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
maxwellpettit committed Jun 22, 2017
1 parent 23745ca commit 165ece5
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 6 deletions.
Expand Up @@ -190,7 +190,7 @@ protected enum ExtractMode {
private IClusterService clusterService;

private Map<String, Semaphore> locks = new HashMap<String, Semaphore>();

public DataExtractorService(ISymmetricEngine engine) {
super(engine.getParameterService(), engine.getSymmetricDialect());
this.outgoingBatchService = engine.getOutgoingBatchService();
Expand Down Expand Up @@ -990,7 +990,6 @@ protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode) {
&& remoteNode.isVersionGreaterThanOrEqualTo(3, 8, 0) && !cclient;
}

//TODO: Implement stream copy and inject stats
protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode, OutgoingBatch currentBatch, boolean isRetry,
IDataWriter dataWriter, BufferedWriter writer, ExtractMode mode) {
if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) {
Expand All @@ -1014,6 +1013,10 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
Batch.getStagedLocation(false, sourceNode.getNodeId()), currentBatch.getBatchId());

SymmetricUtils.copyFile(extractedBatch.getFile(), targetResource.getFile());

//TODO: Call Copy
//copyFromStaging(extractedBatch, targetResource, currentBatch);

targetResource.setState(State.DONE);

isRetry = true;
Expand Down Expand Up @@ -1057,18 +1060,49 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
return currentBatch;

}

// TODO: Implement Copy and Inject Stats
protected void copyFromStaging(IStagedResource sourceResource, IStagedResource targetResource, OutgoingBatch batch) {
BufferedReader source = sourceResource.getReader();
BufferedWriter target = targetResource.getWriter(0);
String line = null;
try {
while ((line = source.readLine()) != null) {
if (line.startsWith(CsvConstants.BATCH)) {
source.mark(0);
String nextLine = source.readLine();
source.reset();
if (nextLine != null && !nextLine.startsWith(CsvConstants.STATS_COLUMNS)) {
target.write(getBatchStatsColumns());
target.newLine();
target.write(getBatchStats(batch));
target.newLine();
}
} else {
target.write(line);
target.newLine();
}
}
} catch (Throwable t) {
throw new RuntimeException(t);
} finally {
sourceResource.close();
sourceResource.dereference();
targetResource.close();
targetResource.dereference();
}
}

protected void transferFromStaging(ExtractMode mode, BatchType batchType, OutgoingBatch batch, boolean isRetry,
IStagedResource stagedResource, BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec) {
final int MAX_WRITE_LENGTH = 32768;
BufferedReader reader = stagedResource.getReader();
try {
// Retry means we've sent this batch before, so let's ask to
// retry the batch from the target's staging
if (isRetry) {
String line = null;
while ((line = reader.readLine()) != null) {
if (line.startsWith(CsvConstants.BATCH)) {
if (line.startsWith(CsvConstants.BATCH)) {
reader.mark(0);
String nextLine = reader.readLine();
reader.reset();
Expand All @@ -1078,7 +1112,6 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi
writer.write(getBatchStats(batch));
writer.newLine();
}

writer.write(CsvConstants.RETRY + "," + batch.getBatchId());
writer.newLine();
writer.write(CsvConstants.COMMIT + "," + batch.getBatchId());
Expand All @@ -1095,6 +1128,7 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi
long startTime = System.currentTimeMillis(), ts = startTime, bts = startTime;
boolean isThrottled = maxKBytesPerSec != null && maxKBytesPerSec.compareTo(BigDecimal.ZERO) > 0;
long totalThrottleTime = 0;
final int MAX_WRITE_LENGTH = 32768;
int bufferSize = MAX_WRITE_LENGTH;

if (isThrottled) {
Expand Down
Expand Up @@ -83,7 +83,7 @@ public void process() throws IOException {

while (reader.readRecord()) {
line = reader.getRawRecord();
//System.out.println("line = " + line);
System.out.println("line = " + line);
if (line.startsWith(CsvConstants.CATALOG)) {
catalogLine = line;
writeLine(line);
Expand Down

0 comments on commit 165ece5

Please sign in to comment.