Skip to content

Commit

Permalink
0002618: Throttle network bandwith usage on channel
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 29, 2016
1 parent bb27da4 commit deea3ba
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 29 deletions.
1 change: 1 addition & 0 deletions symmetric-assemble/src/asciidoc/configuration/channels.ad
Expand Up @@ -36,6 +36,7 @@ Channel ID:: Identifier used through the system to identify a given channel.
[[max-batch-size]]Max Batch Size:: Specifies the maximum number of data events to process within a batch for this channel.
[[max-batch-to-send]]Max Batch To Send:: Specifies the maximum number of batches to send for a given channel during a 'synchronization' between two nodes. A 'synchronization' is equivalent to a push or a pull. For example, if there are 12 batches ready to be sent for a channel and max_batch_to_send is equal to 10, then only the first 10 batches will be sent even though 12 batches are ready.
[[max-data-to-route]]Max Data To Route:: Specifies the maximum number of data rows to route for a channel at a time.
[[max-network-kbps]]Max KB/s:: Specifies the maximum network transfer rate in kilobytes per second. Use zero to indicate unlimited. When throttling the channel, make sure the channel is on its own queue or within a queue of channels that are throttled at the same rate. This is currently only implemented when staging is enabled.
Data Loader Types:: Determines how data will be loaded into the target tables. These are used during an initial load or a reverse initial load. Data loaders do not always
have to load into the target relational database. They can write to a file, a web service, or any other type of non-relational data source.
Data loaders can also use other techniques to increase performance of data loads into the target relation database.
Expand Down
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.model;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.Date;

Expand All @@ -41,6 +42,8 @@ public class Channel implements Serializable {
private int maxBatchToSend = 100;

private int maxDataToRoute = 10000;

private BigDecimal maxKBytesPerSecond = BigDecimal.ZERO;

private boolean enabled = true;

Expand Down Expand Up @@ -269,21 +272,29 @@ public boolean isReloadFlag() {
}

public String getQueue() {
return queue;
}
return queue;
}

public void setQueue(String queue) {
this.queue = queue;
}
public void setQueue(String queue) {
this.queue = queue;
}

@Override
public int hashCode() {
if (channelId != null) {
return channelId.hashCode();
} else {
return super.hashCode();
}
}
public BigDecimal getMaxKBytesPerSecond() {
return maxKBytesPerSecond;
}

public void setMaxKBytesPerSecond(BigDecimal maxKBytesPerSecond) {
this.maxKBytesPerSecond = maxKBytesPerSecond;
}

@Override
public int hashCode() {
if (channelId != null) {
return channelId.hashCode();
} else {
return super.hashCode();
}
}

@Override
public boolean equals(Object obj) {
Expand Down
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.model;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;

/**
Expand Down Expand Up @@ -238,4 +239,13 @@ public void setQueue(String queue) {
public String getQueue() {
return this.channel.getQueue();
}

public BigDecimal getMaxKBytesPerSecond() {
return this.channel.getMaxKBytesPerSecond();
}

public void setMaxKBytesPerSecond(BigDecimal maxKBytesPerSecond) {
this.channel.setMaxKBytesPerSecond(maxKBytesPerSecond);
}

}
Expand Up @@ -252,7 +252,7 @@ public void saveChannel(Channel channel, boolean reloadChannels) {
channel.getExtractPeriodMillis(), channel.getDataLoaderType(),
channel.getLastUpdateTime(), channel.getLastUpdateBy(),
channel.isReloadFlag() ? 1 : 0, channel.isFileSyncFlag() ? 1 : 0,
channel.getQueue(),
channel.getQueue(), channel.getMaxKBytesPerSecond(),
channel.getChannelId() })) {
channel.setCreateTime(new Date());
sqlTemplate.update(
Expand All @@ -267,7 +267,7 @@ public void saveChannel(Channel channel, boolean reloadChannels) {
channel.getDataLoaderType(), channel.getLastUpdateTime(),
channel.getLastUpdateBy(), channel.getCreateTime(),
channel.isReloadFlag() ? 1 : 0, channel.isFileSyncFlag() ? 1 : 0,
channel.getQueue() });
channel.getQueue(), channel.getMaxKBytesPerSecond() });
}
if (reloadChannels) {
clearCache();
Expand Down Expand Up @@ -386,6 +386,7 @@ public NodeChannel mapRow(Row row) {
.getBoolean("file_sync_flag"));
nodeChannel.setReloadFlag(row.getBoolean("reload_flag"));
nodeChannel.setQueue(row.getString("queue"));
nodeChannel.setMaxKBytesPerSecond(row.getBigDecimal("max_network_kbps"));
return nodeChannel;
};
}, nodeId);
Expand Down Expand Up @@ -567,6 +568,7 @@ public Channel mapRow(Row row) {
channel.setReloadFlag(row.getBoolean("reload_flag"));
channel.setFileSyncFlag(row.getBoolean("file_sync_flag"));
channel.setQueue(row.getString("queue"));
channel.setMaxKBytesPerSecond(row.getBigDecimal("max_network_kbps"));
return channel;
}
});
Expand Down
Expand Up @@ -75,15 +75,16 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform,
" c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, " +
" c.batch_algorithm, c.extract_period_millis, c.data_loader_type, " +
" c.last_update_time, c.last_update_by, c.create_time, c.reload_flag, c.file_sync_flag, " +
" c.queue " +
" c.queue, c.max_network_kbps " +
" from $(channel) c order by c.processing_order asc, c.channel_id ");

putSql("selectNodeChannelsSql",
"select c.channel_id, nc.node_id, nc.ignore_enabled, nc.suspend_enabled, c.processing_order, "
+ " c.max_batch_size, c.enabled, c.max_batch_to_send, c.max_data_to_route, c.use_old_data_to_route, "
+ " c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, c.batch_algorithm, "
+ " nc.last_extract_time, c.extract_period_millis, c.data_loader_type, "
+ " last_update_time, last_update_by, create_time, c.reload_flag, c.file_sync_flag, c.queue "
+ " last_update_time, last_update_by, create_time, c.reload_flag, c.file_sync_flag, c.queue, "
+ " c.max_network_kbps "
+ " from $(channel) c left outer join "
+ " $(node_channel_ctl) nc on c.channel_id = nc.channel_id and nc.node_id = ? "
+ " order by c.processing_order asc, c.channel_id ");
Expand All @@ -98,14 +99,15 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform,
+ " max_batch_to_send, max_data_to_route, use_old_data_to_route, use_row_data_to_route, "
+ " use_pk_data_to_route, contains_big_lob, enabled, batch_algorithm, description, "
+ " extract_period_millis, data_loader_type, last_update_time, last_update_by, "
+ " create_time, reload_flag, file_sync_flag,queue) "
+ " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, null, ?, ?, ?, ?, ?, ?, ?, ?) ");
+ " create_time, reload_flag, file_sync_flag, queue, max_network_kbps) "
+ " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, null, ?, ?, ?, ?, ?, ?, ?, ?, ?)");

putSql("updateChannelSql",
"update $(channel) set processing_order=?, max_batch_size=?, "
+ " max_batch_to_send=?, max_data_to_route=?, use_old_data_to_route=?, use_row_data_to_route=?, "
+ " use_pk_data_to_route=?, contains_big_lob=?, enabled=?, batch_algorithm=?, extract_period_millis=?, "
+ " data_loader_type=?, last_update_time=?, last_update_by=?, reload_flag=?, file_sync_flag=?, queue=? "
+ " data_loader_type=?, last_update_time=?, last_update_by=?, reload_flag=?, file_sync_flag=?, queue=?, "
+ " max_network_kbps = ? "
+ " where channel_id=? ");

putSql("deleteNodeGroupLinkSql",
Expand Down
Expand Up @@ -906,9 +906,11 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
IStagedResource extractedBatch = getStagedResource(currentBatch);
if (extractedBatch != null) {
if (mode == ExtractMode.FOR_SYM_CLIENT && writer != null) {
Channel channel = configurationService.getChannel(currentBatch.getChannelId());
DataContext ctx = new DataContext();
SimpleStagingDataReader dataReader = new SimpleStagingDataReader(BatchType.EXTRACT, currentBatch.getBatchId(),
currentBatch.getNodeId(), isRetry(currentBatch, targetNode), extractedBatch, writer, ctx);
currentBatch.getNodeId(), isRetry(currentBatch, targetNode), extractedBatch, writer, ctx,
channel.getMaxKBytesPerSecond());
dataReader.process();
} else {
IDataReader dataReader = new ProtocolDataReader(BatchType.EXTRACT,
Expand Down
1 change: 1 addition & 0 deletions symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -43,6 +43,7 @@
<column name="data_loader_type" type="VARCHAR" size="50" required="true" default="default" description="Identify the type of data loader this channel should use. Allows for the default dataloader to be swapped out via configuration for more efficient platform specific data loaders."/>
<column name="description" type="VARCHAR" size="255" description="Description on the type of data carried in this channel."/>
<column name="queue" type="VARCHAR" size="25" default="default" required="true" description="User provided queue name for channel to operate on. Creates multi-threaded channels. Defaults to 'default' thread"/>
<column name="max_network_kbps" type="DECIMAL" size="10,3" default="0" required="true" description="The maximum network transfer rate in kilobytes per second. Zero or negative means unlimited. When throttling the channel, make sure the channel is on its own queue or within a queue of channels that are throttled at the same rate. This is currently only implemented when staging is enabled."/>
<column name="create_time" type="TIMESTAMP" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" description="Timestamp when a user last updated this entry." />
Expand Down
24 changes: 24 additions & 0 deletions symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java
Expand Up @@ -171,6 +171,30 @@ public long getLong(String columnName) {
}
}

public float getFloat(String columnName) {
Object obj = this.get(columnName);
if (obj instanceof Number) {
return ((Number) obj).floatValue();
} else if (obj instanceof String) {
return Float.parseFloat(obj.toString());
} else {
checkForColumn(columnName);
return 0;
}
}

public BigDecimal getBigDecimal(String columnName) {
Object obj = this.get(columnName);
if (obj instanceof BigDecimal) {
return (BigDecimal) obj;
} else if (obj instanceof String) {
return new BigDecimal(obj.toString());
} else {
checkForColumn(columnName);
return null;
}
}

public boolean getBoolean(String columnName) {
Object obj = this.get(columnName);
if ("1".equals(obj)) {
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.math.BigDecimal;

import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
Expand All @@ -43,16 +44,18 @@ public class SimpleStagingDataReader {
protected IStagedResource stagedResource;
protected BufferedWriter writer;
protected DataContext context;
protected BigDecimal maxKBytesPerSec;

public SimpleStagingDataReader(BatchType batchType, long batchId, String targetNodeId, boolean isRetry,
IStagedResource stagedResource, BufferedWriter writer, DataContext context) {
IStagedResource stagedResource, BufferedWriter writer, DataContext context, BigDecimal maxKBytesPerSec) {
this.batchType = batchType;
this.batchId = batchId;
this.targetNodeId = targetNodeId;
this.isRetry = isRetry;
this.stagedResource = stagedResource;
this.writer = writer;
this.context = context;
this.maxKBytesPerSec = maxKBytesPerSec;
}

public void process() {
Expand All @@ -74,10 +77,17 @@ public void process() {
}
}
} else {
char[] buffer = new char[MAX_WRITE_LENGTH];
long totalCharsRead = 0;
int numCharsRead = 0;
long startTime = System.currentTimeMillis(), ts = startTime;
long totalCharsRead = 0, totalBytesRead = 0;
int numCharsRead = 0, numBytesRead = 0;
long startTime = System.currentTimeMillis(), ts = startTime, bts = startTime;
boolean isThrottled = maxKBytesPerSec != null && maxKBytesPerSec.compareTo(BigDecimal.ZERO) > 0;
long totalThrottleTime = 0;
int bufferSize = MAX_WRITE_LENGTH;

if (isThrottled) {
bufferSize = maxKBytesPerSec.multiply(new BigDecimal(1024)).intValue();
}
char[] buffer = new char[bufferSize];

while ((numCharsRead = reader.read(buffer)) != -1) {
writer.write(buffer, 0, numCharsRead);
Expand All @@ -86,15 +96,35 @@ public void process() {
if (Thread.currentThread().isInterrupted()) {
throw new IoException("This thread was interrupted");
}

if (System.currentTimeMillis() - ts > 60000) {
log.info("Batch '{}', for node '{}', for process 'send from stage' has been processing for {} seconds. " +
"The following stats have been gathered: {}",
new Object[] { batchId, targetNodeId, (System.currentTimeMillis() - startTime) / 1000,
"BYTES=" + totalCharsRead });
"CHARS=" + totalCharsRead });
ts = System.currentTimeMillis();
}
}

if (isThrottled) {
numBytesRead += new String(buffer, 0, numCharsRead).getBytes().length;
totalBytesRead += numBytesRead;
if (numBytesRead >= bufferSize) {
long expectedMillis = (long) (((numBytesRead / 1024f) / maxKBytesPerSec.floatValue()) * 1000);
long actualMillis = System.currentTimeMillis() - bts;
if (actualMillis < expectedMillis) {
totalThrottleTime += expectedMillis - actualMillis;
Thread.sleep(expectedMillis - actualMillis);
}
numBytesRead = 0;
bts = System.currentTimeMillis();
}
}
}
if (log.isDebugEnabled() && totalThrottleTime > 0) {
log.debug("Batch '{}' for node '{}' took {}ms for {} bytes and was throttled for {}ms because limit is set to {} KB/s",
batchId, targetNodeId, (System.currentTimeMillis() - startTime), totalBytesRead, totalThrottleTime,
maxKBytesPerSec);
}
}
} catch (Throwable t) {
throw new RuntimeException(t);
Expand Down

0 comments on commit deea3ba

Please sign in to comment.