diff --git a/symmetric-assemble/src/asciidoc/configuration/channels.ad b/symmetric-assemble/src/asciidoc/configuration/channels.ad
index 05f149019a..21ce4a87b5 100644
--- a/symmetric-assemble/src/asciidoc/configuration/channels.ad
+++ b/symmetric-assemble/src/asciidoc/configuration/channels.ad
@@ -36,6 +36,7 @@ Channel ID:: Identifier used through the system to identify a given channel.
[[max-batch-size]]Max Batch Size:: Specifies the maximum number of data events to process within a batch for this channel.
[[max-batch-to-send]]Max Batch To Send:: Specifies the maximum number of batches to send for a given channel during a 'synchronization' between two nodes. A 'synchronization' is equivalent to a push or a pull. For example, if there are 12 batches ready to be sent for a channel and max_batch_to_send is equal to 10, then only the first 10 batches will be sent even though 12 batches are ready.
[[max-data-to-route]]Max Data To Route:: Specifies the maximum number of data rows to route for a channel at a time.
+[[max-network-kbps]]Max KB/s:: Specifies the maximum network transfer rate in kilobytes per second. Use zero to indicate unlimited. When throttling the channel, make sure the channel is on its own queue or within a queue of channels that are throttled at the same rate. This is currently only implemented when staging is enabled.
Data Loader Types:: Determines how data will be loaded into the target tables. These are used during an initial load or a reverse initial load. Data loaders do not always
have to load into the target relational database. They can write to a file, a web service, or any other type of non-relational data source.
Data loaders can also use other techniques to increase performance of data loads into the target relation database.
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java
index 16c940c363..d9880df594 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java
@@ -21,6 +21,7 @@
package org.jumpmind.symmetric.model;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.util.Collection;
import java.util.Date;
@@ -41,6 +42,8 @@ public class Channel implements Serializable {
private int maxBatchToSend = 100;
private int maxDataToRoute = 10000;
+
+ private BigDecimal maxKBytesPerSecond = BigDecimal.ZERO;
private boolean enabled = true;
@@ -269,21 +272,29 @@ public boolean isReloadFlag() {
}
public String getQueue() {
- return queue;
- }
+ return queue;
+ }
- public void setQueue(String queue) {
- this.queue = queue;
- }
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
- @Override
- public int hashCode() {
- if (channelId != null) {
- return channelId.hashCode();
- } else {
- return super.hashCode();
- }
- }
+ public BigDecimal getMaxKBytesPerSecond() {
+ return maxKBytesPerSecond;
+ }
+
+ public void setMaxKBytesPerSecond(BigDecimal maxKBytesPerSecond) {
+ this.maxKBytesPerSecond = maxKBytesPerSecond;
+ }
+
+ @Override
+ public int hashCode() {
+ if (channelId != null) {
+ return channelId.hashCode();
+ } else {
+ return super.hashCode();
+ }
+ }
@Override
public boolean equals(Object obj) {
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java
index 869448b00b..a754e1e679 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java
@@ -21,6 +21,7 @@
package org.jumpmind.symmetric.model;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.util.Date;
/**
@@ -238,4 +239,13 @@ public void setQueue(String queue) {
public String getQueue() {
return this.channel.getQueue();
}
+
+ public BigDecimal getMaxKBytesPerSecond() {
+ return this.channel.getMaxKBytesPerSecond();
+ }
+
+ public void setMaxKBytesPerSecond(BigDecimal maxKBytesPerSecond) {
+ this.channel.setMaxKBytesPerSecond(maxKBytesPerSecond);
+ }
+
}
\ No newline at end of file
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java
index 450d21bda8..21725ba268 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java
@@ -252,7 +252,7 @@ public void saveChannel(Channel channel, boolean reloadChannels) {
channel.getExtractPeriodMillis(), channel.getDataLoaderType(),
channel.getLastUpdateTime(), channel.getLastUpdateBy(),
channel.isReloadFlag() ? 1 : 0, channel.isFileSyncFlag() ? 1 : 0,
- channel.getQueue(),
+ channel.getQueue(), channel.getMaxKBytesPerSecond(),
channel.getChannelId() })) {
channel.setCreateTime(new Date());
sqlTemplate.update(
@@ -267,7 +267,7 @@ public void saveChannel(Channel channel, boolean reloadChannels) {
channel.getDataLoaderType(), channel.getLastUpdateTime(),
channel.getLastUpdateBy(), channel.getCreateTime(),
channel.isReloadFlag() ? 1 : 0, channel.isFileSyncFlag() ? 1 : 0,
- channel.getQueue() });
+ channel.getQueue(), channel.getMaxKBytesPerSecond() });
}
if (reloadChannels) {
clearCache();
@@ -386,6 +386,7 @@ public NodeChannel mapRow(Row row) {
.getBoolean("file_sync_flag"));
nodeChannel.setReloadFlag(row.getBoolean("reload_flag"));
nodeChannel.setQueue(row.getString("queue"));
+ nodeChannel.setMaxKBytesPerSecond(row.getBigDecimal("max_network_kbps"));
return nodeChannel;
};
}, nodeId);
@@ -567,6 +568,7 @@ public Channel mapRow(Row row) {
channel.setReloadFlag(row.getBoolean("reload_flag"));
channel.setFileSyncFlag(row.getBoolean("file_sync_flag"));
channel.setQueue(row.getString("queue"));
+ channel.setMaxKBytesPerSecond(row.getBigDecimal("max_network_kbps"));
return channel;
}
});
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java
index 6960dfc868..b30e757093 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java
@@ -75,7 +75,7 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform,
" c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, " +
" c.batch_algorithm, c.extract_period_millis, c.data_loader_type, " +
" c.last_update_time, c.last_update_by, c.create_time, c.reload_flag, c.file_sync_flag, " +
- " c.queue " +
+ " c.queue, c.max_network_kbps " +
" from $(channel) c order by c.processing_order asc, c.channel_id ");
putSql("selectNodeChannelsSql",
@@ -83,7 +83,8 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform,
+ " c.max_batch_size, c.enabled, c.max_batch_to_send, c.max_data_to_route, c.use_old_data_to_route, "
+ " c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, c.batch_algorithm, "
+ " nc.last_extract_time, c.extract_period_millis, c.data_loader_type, "
- + " last_update_time, last_update_by, create_time, c.reload_flag, c.file_sync_flag, c.queue "
+ + " last_update_time, last_update_by, create_time, c.reload_flag, c.file_sync_flag, c.queue, "
+ + " c.max_network_kbps "
+ " from $(channel) c left outer join "
+ " $(node_channel_ctl) nc on c.channel_id = nc.channel_id and nc.node_id = ? "
+ " order by c.processing_order asc, c.channel_id ");
@@ -98,14 +99,15 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform,
+ " max_batch_to_send, max_data_to_route, use_old_data_to_route, use_row_data_to_route, "
+ " use_pk_data_to_route, contains_big_lob, enabled, batch_algorithm, description, "
+ " extract_period_millis, data_loader_type, last_update_time, last_update_by, "
- + " create_time, reload_flag, file_sync_flag,queue) "
- + " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, null, ?, ?, ?, ?, ?, ?, ?, ?) ");
+ + " create_time, reload_flag, file_sync_flag, queue, max_network_kbps) "
+ + " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, null, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
putSql("updateChannelSql",
"update $(channel) set processing_order=?, max_batch_size=?, "
+ " max_batch_to_send=?, max_data_to_route=?, use_old_data_to_route=?, use_row_data_to_route=?, "
+ " use_pk_data_to_route=?, contains_big_lob=?, enabled=?, batch_algorithm=?, extract_period_millis=?, "
- + " data_loader_type=?, last_update_time=?, last_update_by=?, reload_flag=?, file_sync_flag=?, queue=? "
+ + " data_loader_type=?, last_update_time=?, last_update_by=?, reload_flag=?, file_sync_flag=?, queue=?, "
+ + " max_network_kbps = ? "
+ " where channel_id=? ");
putSql("deleteNodeGroupLinkSql",
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java
index 8bc43e375b..4003b3bc19 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java
@@ -906,9 +906,11 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
IStagedResource extractedBatch = getStagedResource(currentBatch);
if (extractedBatch != null) {
if (mode == ExtractMode.FOR_SYM_CLIENT && writer != null) {
+ Channel channel = configurationService.getChannel(currentBatch.getChannelId());
DataContext ctx = new DataContext();
SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT, currentBatch.getBatchId(),
- currentBatch.getNodeId(), isRetry(currentBatch, targetNode), extractedBatch, writer, ctx);
+ currentBatch.getNodeId(), isRetry(currentBatch, targetNode), extractedBatch, writer, ctx,
+ channel.getMaxKBytesPerSecond());
dataReader.process();
} else {
IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT,
diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml
index f6dc77a546..cb006dfdbd 100644
--- a/symmetric-core/src/main/resources/symmetric-schema.xml
+++ b/symmetric-core/src/main/resources/symmetric-schema.xml
@@ -43,6 +43,7 @@
+
diff --git a/symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java b/symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java
index 705691c8c2..acef6a34c8 100644
--- a/symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java
+++ b/symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java
@@ -171,6 +171,30 @@ public long getLong(String columnName) {
}
}
+ public float getFloat(String columnName) {
+ Object obj = this.get(columnName);
+ if (obj instanceof Number) {
+ return ((Number) obj).floatValue();
+ } else if (obj instanceof String) {
+ return Float.parseFloat(obj.toString());
+ } else {
+ checkForColumn(columnName);
+ return 0;
+ }
+ }
+
+ public BigDecimal getBigDecimal(String columnName) {
+ Object obj = this.get(columnName);
+ if (obj instanceof BigDecimal) {
+ return (BigDecimal) obj;
+ } else if (obj instanceof String) {
+ return new BigDecimal(obj.toString());
+ } else {
+ checkForColumn(columnName);
+ return null;
+ }
+ }
+
public boolean getBoolean(String columnName) {
Object obj = this.get(columnName);
if ("1".equals(obj)) {
diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java
index e5aa98943d..88567d744e 100644
--- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java
+++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/SimpleStagingDataReader.java
@@ -22,6 +22,7 @@
import java.io.BufferedReader;
import java.io.BufferedWriter;
+import java.math.BigDecimal;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
@@ -43,9 +44,10 @@ public class SimpleStagingDataReader {
protected IStagedResource stagedResource;
protected BufferedWriter writer;
protected DataContext context;
+ protected BigDecimal maxKBytesPerSec;
public SimpleStagingDataReader(BatchType batchType, long batchId, String targetNodeId, boolean isRetry,
- IStagedResource stagedResource, BufferedWriter writer, DataContext context) {
+ IStagedResource stagedResource, BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec) {
this.batchType = batchType;
this.batchId = batchId;
this.targetNodeId = targetNodeId;
@@ -53,6 +55,7 @@ public SimpleStagingDataReader(BatchType batchType, long batchId, String targetN
this.stagedResource = stagedResource;
this.writer = writer;
this.context = context;
+ this.maxKBytesPerSec = maxKBytesPerSec;
}
public void process() {
@@ -74,10 +77,17 @@ public void process() {
}
}
} else {
- char[] buffer = new char[MAX_WRITE_LENGTH];
- long totalCharsRead = 0;
- int numCharsRead = 0;
- long startTime = System.currentTimeMillis(), ts = startTime;
+ long totalCharsRead = 0, totalBytesRead = 0;
+ int numCharsRead = 0, numBytesRead = 0;
+ long startTime = System.currentTimeMillis(), ts = startTime, bts = startTime;
+ boolean isThrottled = maxKBytesPerSec != null && maxKBytesPerSec.compareTo(BigDecimal.ZERO) > 0;
+ long totalThrottleTime = 0;
+ int bufferSize = MAX_WRITE_LENGTH;
+
+ if (isThrottled) {
+ bufferSize = maxKBytesPerSec.multiply(new BigDecimal(1024)).intValue();
+ }
+ char[] buffer = new char[bufferSize];
while ((numCharsRead = reader.read(buffer)) != -1) {
writer.write(buffer, 0, numCharsRead);
@@ -86,15 +96,35 @@ public void process() {
if (Thread.currentThread().isInterrupted()) {
throw new IoException("This thread was interrupted");
}
-
+
if (System.currentTimeMillis() - ts > 60000) {
log.info("Batch '{}', for node '{}', for process 'send from stage' has been processing for {} seconds. " +
"The following stats have been gathered: {}",
new Object[] { batchId, targetNodeId, (System.currentTimeMillis() - startTime) / 1000,
- "BYTES=" + totalCharsRead });
+ "CHARS=" + totalCharsRead });
ts = System.currentTimeMillis();
}
- }
+
+ if (isThrottled) {
+ numBytesRead += new String(buffer, 0, numCharsRead).getBytes().length;
+ totalBytesRead += numBytesRead;
+ if (numBytesRead >= bufferSize) {
+ long expectedMillis = (long) (((numBytesRead / 1024f) / maxKBytesPerSec.floatValue()) * 1000);
+ long actualMillis = System.currentTimeMillis() - bts;
+ if (actualMillis < expectedMillis) {
+ totalThrottleTime += expectedMillis - actualMillis;
+ Thread.sleep(expectedMillis - actualMillis);
+ }
+ numBytesRead = 0;
+ bts = System.currentTimeMillis();
+ }
+ }
+ }
+ if (log.isDebugEnabled() && totalThrottleTime > 0) {
+ log.debug("Batch '{}' for node '{}' took {}ms for {} bytes and was throttled for {}ms because limit is set to {} KB/s",
+ batchId, targetNodeId, (System.currentTimeMillis() - startTime), totalBytesRead, totalThrottleTime,
+ maxKBytesPerSec);
+ }
}
} catch (Throwable t) {
throw new RuntimeException(t);