Skip to content

Commit

Permalink
SYMMETRICDS-333 - Allow a parameter to control the number of bytes th…
Browse files Browse the repository at this point in the history
…at will be sent during one synchronization.
  • Loading branch information
chenson42 committed Sep 3, 2010
1 parent a5dc1e9 commit ebaebd9
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 28 deletions.
Expand Up @@ -101,6 +101,7 @@ private ParameterConstants() {
public final static String TRANSPORT_HTTP_BASIC_AUTH_USERNAME = "http.basic.auth.username";
public final static String TRANSPORT_HTTP_BASIC_AUTH_PASSWORD = "http.basic.auth.password";
public final static String TRANSPORT_TYPE = "transport.type";
public final static String TRANSPORT_MAX_BYTES_TO_SYNC = "transport.max.bytes.to.sync";
public final static String TRANSPORT_HTTPS_VERIFIED_SERVERS = "https.verified.server.names";
public final static String TRANSPORT_HTTPS_ALLOW_SELF_SIGNED_CERTS = "https.allow.self.signed.certs";

Expand Down
Expand Up @@ -73,5 +73,11 @@ public void incrementDataEventCount() {
batch.incrementDataEventCount();
}
}

public void incrementByteCount(int size) {
if (batch != null) {
batch.incrementByteCount(size);
}
}

}
Expand Up @@ -68,37 +68,37 @@ public void preprocessTable(Data data, String routerId, Writer out, DataExtracto
}
String triggerHistoryId = Integer.toString(data.getTriggerHistory().getTriggerHistoryId()).intern();
if (!context.getHistoryRecordsWritten().contains(triggerHistoryId)) {
writeTable(data, routerId, out);
CsvUtils.write(out, CsvConstants.KEYS, ", ", data.getTriggerHistory().getPkColumnNames());
writeTable(data, routerId, out, context);
context.incrementByteCount(CsvUtils.write(out, CsvConstants.KEYS, ", ", data.getTriggerHistory().getPkColumnNames()));
CsvUtils.writeLineFeed(out);
CsvUtils.write(out, CsvConstants.COLUMNS, ", ", data.getTriggerHistory().getColumnNames());
context.incrementByteCount(CsvUtils.write(out, CsvConstants.COLUMNS, ", ", data.getTriggerHistory().getColumnNames()));
CsvUtils.writeLineFeed(out);
context.getHistoryRecordsWritten().add(triggerHistoryId);
} else if (!context.isLastTable(data.getTableName())) {
writeTable(data, routerId, out);
writeTable(data, routerId, out, context);
}

if (data.getEventType() == DataEventType.UPDATE && data.getOldData() != null && parameterService.is(ParameterConstants.DATA_EXTRACTOR_OLD_DATA_ENABLED)) {
CsvUtils.write(out, CsvConstants.OLD, ", ", data.getOldData());
context.incrementByteCount(CsvUtils.write(out, CsvConstants.OLD, ", ", data.getOldData()));
CsvUtils.writeLineFeed(out);
}
context.setLastTableName(data.getTableName());
}

protected void writeTable(Data data, String routerId, Writer out) throws IOException {
protected void writeTable(Data data, String routerId, Writer out, DataExtractorContext context) throws IOException {
// TODO Add property and write the source schema and the source catalog if set
Router router = triggerRouterService.getActiveRouterByIdForCurrentNode(routerId, false);
String schemaName = (router == null || router.getTargetSchemaName() == null) ? "" : router
.getTargetSchemaName();
CsvUtils.write(out, CsvConstants.SCHEMA, ", ", schemaName);
context.incrementByteCount(CsvUtils.write(out, CsvConstants.SCHEMA, ", ", schemaName));
CsvUtils.writeLineFeed(out);
String catalogName = (router == null || router.getTargetCatalogName() == null) ? "" : router
.getTargetCatalogName();
CsvUtils.write(out, CsvConstants.CATALOG, ", ", catalogName);
context.incrementByteCount(CsvUtils.write(out, CsvConstants.CATALOG, ", ", catalogName));
CsvUtils.writeLineFeed(out);
String tableName = (router == null || router.getTargetTableName() == null) ? data.getTableName() : router
.getTargetTableName();
CsvUtils.write(out, CsvConstants.TABLE, ", ", tableName);
context.incrementByteCount(CsvUtils.write(out, CsvConstants.TABLE, ", ", tableName));
CsvUtils.writeLineFeed(out);
}

Expand Down
Expand Up @@ -58,19 +58,19 @@ public void init(Writer writer, DataExtractorContext context) throws IOException
Node nodeIdentity = nodeService.findIdentity();
String nodeId = (nodeIdentity == null) ? parameterService.getString(ParameterConstants.EXTERNAL_ID)
: nodeIdentity.getNodeId();
CsvUtils.write(writer, CsvConstants.NODEID, CsvUtils.DELIMITER, nodeId);
context.incrementByteCount(CsvUtils.write(writer, CsvConstants.NODEID, CsvUtils.DELIMITER, nodeId));
CsvUtils.writeLineFeed(writer);
}

public void begin(OutgoingBatch batch, Writer writer) throws IOException {
CsvUtils.write(writer, CsvConstants.BATCH, CsvUtils.DELIMITER, Long.toString(batch.getBatchId()));
batch.incrementByteCount(CsvUtils.write(writer, CsvConstants.BATCH, CsvUtils.DELIMITER, Long.toString(batch.getBatchId())));
CsvUtils.writeLineFeed(writer);
CsvUtils.write(writer, CsvConstants.BINARY, CsvUtils.DELIMITER, dbDialect.getBinaryEncoding().name());
batch.incrementByteCount(CsvUtils.write(writer, CsvConstants.BINARY, CsvUtils.DELIMITER, dbDialect.getBinaryEncoding().name()));
CsvUtils.writeLineFeed(writer);
}

public void commit(OutgoingBatch batch, Writer writer) throws IOException {
CsvUtils.write(writer, CsvConstants.COMMIT, CsvUtils.DELIMITER, Long.toString(batch.getBatchId()));
batch.incrementByteCount(CsvUtils.write(writer, CsvConstants.COMMIT, CsvUtils.DELIMITER, Long.toString(batch.getBatchId())));
CsvUtils.writeLineFeed(writer);
}

Expand Down
Expand Up @@ -70,15 +70,15 @@ public void preprocessTable(Data data, String routerId, Writer out, DataExtracto
String triggerHistId = Integer.toString(data.getTriggerHistory().getTriggerHistoryId()).intern();
if (!context.getHistoryRecordsWritten().contains(triggerHistId)) {

CsvUtils.write(out, CsvConstants.TABLE, ", ", data.getTableName());
context.incrementByteCount(CsvUtils.write(out, CsvConstants.TABLE, ", ", data.getTableName()));
CsvUtils.writeLineFeed(out);
CsvUtils.write(out, CsvConstants.KEYS, ", ", data.getTriggerHistory().getPkColumnNames());
context.incrementByteCount(CsvUtils.write(out, CsvConstants.KEYS, ", ", data.getTriggerHistory().getPkColumnNames()));
CsvUtils.writeLineFeed(out);
CsvUtils.write(out, CsvConstants.COLUMNS, ", ", data.getTriggerHistory().getColumnNames());
context.incrementByteCount(CsvUtils.write(out, CsvConstants.COLUMNS, ", ", data.getTriggerHistory().getColumnNames()));
CsvUtils.writeLineFeed(out);
context.getHistoryRecordsWritten().add(triggerHistId);
} else if (!context.isLastTable(data.getTableName())) {
CsvUtils.write(out, CsvConstants.TABLE, ", ", data.getTableName());
context.incrementByteCount(CsvUtils.write(out, CsvConstants.TABLE, ", ", data.getTableName()));
CsvUtils.writeLineFeed(out);
}

Expand Down
Expand Up @@ -11,7 +11,7 @@
public class StreamCreateDataCommand extends AbstractStreamDataCommand {

public void execute(Writer writer, Data data, String routerId, DataExtractorContext context) throws IOException {
CsvUtils.write(writer, CsvConstants.CREATE, DELIMITER, data.getRowData());
context.incrementByteCount(CsvUtils.write(writer, CsvConstants.CREATE, DELIMITER, data.getRowData()));
CsvUtils.writeLineFeed(writer);
context.incrementDataEventCount();
}
Expand Down
Expand Up @@ -32,7 +32,7 @@
class StreamDeleteDataCommand extends AbstractStreamDataCommand {

public void execute(Writer out, Data data, String routerId, DataExtractorContext context) throws IOException {
CsvUtils.write(out, CsvConstants.DELETE, DELIMITER, data.getPkData());
context.incrementByteCount(CsvUtils.write(out, CsvConstants.DELETE, DELIMITER, data.getPkData()));
CsvUtils.writeLineFeed(out);
context.incrementDataEventCount();
}
Expand Down
Expand Up @@ -32,7 +32,7 @@
class StreamInsertDataCommand extends AbstractStreamDataCommand {

public void execute(Writer writer, Data data, String routerId, DataExtractorContext context) throws IOException {
CsvUtils.write(writer, CsvConstants.INSERT, DELIMITER, data.getRowData());
context.incrementByteCount(CsvUtils.write(writer, CsvConstants.INSERT, DELIMITER, data.getRowData()));
CsvUtils.writeLineFeed(writer);
context.incrementDataEventCount();
}
Expand Down
Expand Up @@ -32,7 +32,7 @@
class StreamUpdateDataCommand extends AbstractStreamDataCommand {

public void execute(Writer out, Data data, String routerId, DataExtractorContext context) throws IOException {
CsvUtils.write(out, CsvConstants.UPDATE, DELIMITER, data.getRowData(), DELIMITER, data.getPkData());
context.incrementByteCount(CsvUtils.write(out, CsvConstants.UPDATE, DELIMITER, data.getRowData(), DELIMITER, data.getPkData()));
CsvUtils.writeLineFeed(out);
context.incrementDataEventCount();
}
Expand Down
Expand Up @@ -397,6 +397,9 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE

OutgoingBatch currentBatch = null;
try {
final long MAX_BYTES_TO_SYNC = parameterService.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);
long bytesSentCount = 0;
int batchesSentCount = 0;
for (OutgoingBatch outgoingBatch : activeBatches) {
currentBatch = outgoingBatch;
outgoingBatch.setStatus(OutgoingBatch.Status.QY);
Expand All @@ -416,6 +419,14 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE
outgoingBatch.setStatus(OutgoingBatch.Status.LD);
outgoingBatch.setLoadCount(outgoingBatch.getLoadCount()+1);
outgoingBatchService.updateOutgoingBatch(outgoingBatch);

bytesSentCount += outgoingBatch.getByteCount();
batchesSentCount++;

if (bytesSentCount >= MAX_BYTES_TO_SYNC) {
log.info("DataExtractorReachedMaxNumberOfBytesToSync", batchesSentCount, bytesSentCount);
break;
}
}
} catch (RuntimeException e) {
SQLException se = unwrapSqlException(e);
Expand Down Expand Up @@ -518,12 +529,12 @@ protected void networkTransfer(BufferedReader reader, BufferedWriter writer) thr
*/
protected void databaseExtract(Node node, OutgoingBatch batch, final IExtractListener handler)
throws IOException {
batch.resetStats();
long ts = System.currentTimeMillis();
handler.startBatch(batch);
selectEventDataToExtract(handler, batch);
handler.endBatch(batch);
batch.setExtractMillis(System.currentTimeMillis() - ts);
batch.resetStats();
long ts = System.currentTimeMillis();
handler.startBatch(batch);
selectEventDataToExtract(handler, batch);
handler.endBatch(batch);
batch.setExtractMillis(System.currentTimeMillis() - ts);
}

public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId)
Expand Down
Expand Up @@ -58,14 +58,15 @@ public static String escapeCsvData(String data) {
return out.toString();
}

public static void write(Writer writer, String... data) throws IOException {
public static int write(Writer writer, String... data) throws IOException {
StringBuilder buffer = new StringBuilder();
for (String string : data) {
buffer.append(string);
}

writer.write(buffer.toString());
log.debug("BufferWriting", buffer);
return buffer.length();
}

public static void writeSql(String sql, Writer writer) throws IOException {
Expand Down
Expand Up @@ -468,6 +468,11 @@ start.watchdog.job=true
# Specify the transport type. Supported values currently include: http, internal
transport.type=http

# This is the number of maximum number of bytes to synchronize in one connect
#
# This property can be overridden in the database
transport.max.bytes.to.sync=1048576

# If using the HsqlDbDialect, this property indicates whether Symmetric should setup the embedded database properties or if an
# external application will be doing so.
hsqldb.initialize.db=true
Expand Down
Expand Up @@ -86,6 +86,7 @@ DataPushingFailed=There was an error while pushing data to the server
DataPushingFailedLock=Did not run the push process because the cluster service has it locked.
DataPushingFailedNoNodeIdentity=Did not run the push process because the the node has no identify and needs to be registered.
DataExtractorCouldNotFindStreamCommand=Could not find the stream command for event type of %s.
DataExtractorReachedMaxNumberOfBytesToSync=Stopped extraction after %d batches and %d bytes. We hit the configured byte threshold to send in one synchronization.
DataEventInsertFailed=Could not insert a data event: data_id=%d batch_id=%d router_id=%s
DataSent=Push data sent to %s
DataAckExtendedReading=Reading extend ack: %s
Expand Down

0 comments on commit ebaebd9

Please sign in to comment.