Skip to content

Commit

Permalink
0002574: Improve performance of the transfer to staging
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Apr 26, 2016
1 parent eaf0a1f commit eab5677
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 19 deletions.
Expand Up @@ -37,10 +37,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
Expand All @@ -60,7 +58,6 @@
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.DataProcessor;
import org.jumpmind.symmetric.io.data.IDataProcessorListener;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.reader.ProtocolDataReader;
import org.jumpmind.symmetric.io.data.transform.TransformPoint;
Expand All @@ -75,7 +72,7 @@
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.StagingDataWriter;
import org.jumpmind.symmetric.io.data.writer.SimpleStagingDataWriter;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
Expand Down Expand Up @@ -460,20 +457,14 @@ protected List<IncomingBatch> loadDataFromTransport(final ProcessInfo processInf
l.syncStarted(ctx);
}

long memoryThresholdInBytes = parameterService
.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD);
long totalNetworkMillis = System.currentTimeMillis();
long memoryThresholdInBytes = parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD);
String targetNodeId = nodeService.findIdentityNodeId();
if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
IDataReader dataReader = new ProtocolDataReader(BatchType.LOAD, targetNodeId,
transport.openReader());
IDataWriter dataWriter = new StagingDataWriter(memoryThresholdInBytes,
sourceNode.getNodeId(), Constants.STAGING_CATEGORY_INCOMING,
stagingManager, new LoadIntoDatabaseOnArrivalListener(processInfo,
sourceNode.getNodeId(), listener));
new DataProcessor(dataReader, dataWriter, "transfer to stage").process(ctx);
totalNetworkMillis = System.currentTimeMillis() - totalNetworkMillis;
LoadIntoDatabaseOnArrivalListener loadListener = new LoadIntoDatabaseOnArrivalListener(processInfo,
sourceNode.getNodeId(), listener);
new SimpleStagingDataWriter(transport.openReader(), stagingManager, Constants.STAGING_CATEGORY_INCOMING,
memoryThresholdInBytes, BatchType.LOAD, targetNodeId, ctx, loadListener).process();
} else {
DataProcessor processor = new DataProcessor(new ProtocolDataReader(BatchType.LOAD,
targetNodeId, transport.openReader()), null, listener, "data load") {
Expand Down
Expand Up @@ -178,7 +178,7 @@ public void test02Statistics() throws Exception {
assertNotNull(batch);
assertEquals(batch.getStatus(), IncomingBatch.Status.ER, "Wrong status. " + printDatabase());
assertEquals(batch.getFailedRowNumber(), 8l, "Wrong failed row number. " + batch.getSqlMessage() + ". " + printDatabase());
assertEquals(batch.getByteCount(), 496l, "Wrong byte count. " + printDatabase());
assertEquals(batch.getByteCount(), 483l, "Wrong byte count. " + printDatabase());
assertEquals(batch.getStatementCount(), 8l, "Wrong statement count. " + printDatabase());
assertEquals(batch.getFallbackInsertCount(), 1l, "Wrong fallback insert count. "
+ printDatabase());
Expand All @@ -196,9 +196,9 @@ public void test03UpdateCollision() throws Exception {
insertValues[0] = getNextId();
insertValues[2] = insertValues[4] = "inserted row for testUpdateCollision";

String[] updateValues = new String[TEST_COLUMNS.length];
String[] updateValues = new String[TEST_COLUMNS.length + 1];
updateValues[0] = getId();
updateValues[TEST_COLUMNS.length - 1] = getNextId();
updateValues[TEST_COLUMNS.length] = getNextId();
updateValues[2] = updateValues[4] = "update will become an insert that violates PK";

ByteArrayOutputStream out = new ByteArrayOutputStream();
Expand Down Expand Up @@ -284,7 +284,7 @@ public void test04SqlStatistics() throws Exception {
assertNotNull(batch);
assertEquals(batch.getStatus(), IncomingBatch.Status.ER, "Wrong status. " + printDatabase());
assertEquals(batch.getFailedRowNumber(), 3l, "Wrong failed row number. " + printDatabase());
Assert.assertEquals("Wrong byte count: " + batch.getByteCount() + ". " + printDatabase(), 407,
Assert.assertEquals("Wrong byte count: " + batch.getByteCount() + ". " + printDatabase(), 394l,
batch.getByteCount());
assertEquals(batch.getStatementCount(), 3l, "Wrong statement count. " + printDatabase());
assertEquals(batch.getFallbackInsertCount(), 0l, "Wrong fallback insert count. "
Expand Down
@@ -0,0 +1,213 @@
package org.jumpmind.symmetric.io.data.writer;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.CsvConstants;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleStagingDataWriter {

final static int MAX_WRITE_LENGTH = 262144;

protected final Logger log = LoggerFactory.getLogger(getClass());

protected BufferedReader reader;
protected IStagingManager stagingManager;
protected IProtocolDataWriterListener[] listeners;
protected long memoryThresholdInBytes;
protected String category;
protected BatchType batchType;
protected String targetNodeId;
protected DataContext context;

protected BufferedWriter writer;
protected Batch batch;

public SimpleStagingDataWriter(BufferedReader reader, IStagingManager stagingManager, String category, long memoryThresholdInBytes,
BatchType batchType, String targetNodeId, DataContext context, IProtocolDataWriterListener... listeners) {
this.reader = reader;
this.stagingManager = stagingManager;
this.memoryThresholdInBytes = memoryThresholdInBytes;
this.category = category;
this.batchType = batchType;
this.targetNodeId = targetNodeId;
this.listeners = listeners;
this.context = context;
}

public void process() throws IOException {
String catalogLine = null, schemaLine = null, nodeLine = null, binaryLine = null, channelLine = null;
TableLine tableLine = null;
Map<TableLine, TableLine> syncTableLines = new HashMap<TableLine, TableLine>();
Map<TableLine, TableLine> batchTableLines = new HashMap<TableLine,TableLine>();
IStagedResource resource = null;
String line = null;
long startTime = System.currentTimeMillis(), ts = startTime, lineCount = 0;

while ((line = reader.readLine()) != null) {
if (line.startsWith(CsvConstants.CATALOG)) {
catalogLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.SCHEMA)) {
schemaLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.TABLE)) {
tableLine = new TableLine(catalogLine, schemaLine, line);
TableLine batchTableLine = batchTableLines.get(tableLine);

if (batchTableLine != null) {
tableLine = batchTableLine;
writeLine(line);
} else {
TableLine syncTableLine = syncTableLines.get(tableLine);
if (syncTableLine != null) {
tableLine = syncTableLine;
writeLine(tableLine.catalogLine);
writeLine(tableLine.schemaLine);
writeLine(line);
writeLine(tableLine.keysLine);
writeLine(tableLine.columnsLine);
} else {
syncTableLines.put(tableLine, tableLine);
batchTableLines.put(tableLine, tableLine);
writeLine(line);
}
}
} else if (line.startsWith(CsvConstants.KEYS)) {
tableLine.keysLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.COLUMNS)) {
tableLine.columnsLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.BATCH)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine),
getBinaryEncoding(binaryLine), getArgLine(nodeLine), targetNodeId, false);
String location = batch.getStagedLocation();
resource = stagingManager.find(category, location, batch.getBatchId());
if (resource == null || resource.getState() == State.DONE) {
log.debug("Creating staged resource for batch {}", batch.getNodeBatchId());
resource = stagingManager.create(memoryThresholdInBytes, category, location, batch.getBatchId());
}
writer = resource.getWriter();
writeLine(nodeLine);
writeLine(binaryLine);
writeLine(channelLine);
writeLine(line);

if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
}
}
} else if (line.startsWith(CsvConstants.COMMIT)) {
writeLine(line);
resource.close();
resource.setState(State.READY);
batchTableLines.clear();

if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.end(context, batch, resource);
}
}
} else if (line.startsWith(CsvConstants.NODEID)) {
nodeLine = line;
} else if (line.startsWith(CsvConstants.BINARY)) {
binaryLine = line;
} else if (line.startsWith(CsvConstants.CHANNEL)) {
channelLine = line;
} else {
int size = line.length();
if (size > MAX_WRITE_LENGTH) {
log.debug("Exceeded max line length with {}", size);
for (int i = 0; i < size; i = i + MAX_WRITE_LENGTH) {
int end = i + MAX_WRITE_LENGTH;
writer.append(line, i, end < size ? end : size);
}
writer.newLine();
} else {
writeLine(line);
}
}

lineCount++;
if (System.currentTimeMillis() - ts > 60000) {
log.info("Batch '{}', for node '{}', for process 'transfer to stage' has been processing for {} seconds. The following stats have been gathered: {}",
new Object[] { batch.getBatchId(), batch.getTargetNodeId(), (System.currentTimeMillis() - startTime) / 1000,
"LINES=" + lineCount + ", BYTES=" + resource.getSize() });
ts = System.currentTimeMillis();
}
}
}

protected String getArgLine(String line) throws IOException {
if (line != null) {
int i = line.indexOf(",");
if (i >= 0) {
return line.substring(i + 1).trim();
}
throw new IOException("Invalid token line in CSV: " + line);
}
return null;
}

protected BinaryEncoding getBinaryEncoding(String line) throws IOException {
String value = getArgLine(line);
if (value != null) {
return BinaryEncoding.valueOf(value);
}
return null;
}

protected void writeLine(String line) throws IOException {
if (line != null) {
if (log.isDebugEnabled()) {
log.debug("Writing staging data: {}", line);
}
writer.write(line);
writer.newLine();
}
}

class TableLine {
String catalogLine;
String schemaLine;
String tableLine;
String keysLine;
String columnsLine;

public TableLine(String catalogLine, String schemaLine, String tableLine) {
this.catalogLine = catalogLine;
this.schemaLine = schemaLine;
this.tableLine = tableLine;
}

@Override
public boolean equals(Object o) {
if (o == null || !(o instanceof TableLine)) {
return false;
}
TableLine t = (TableLine) o;
return StringUtils.equals(catalogLine, t.catalogLine) && StringUtils.equals(schemaLine, t.schemaLine)
&& StringUtils.equals(tableLine, t.tableLine);
}

@Override
public int hashCode() {
return (catalogLine + "." + schemaLine + "." + tableLine).hashCode();
}
}
}

0 comments on commit eab5677

Please sign in to comment.