Permalink
Browse files

Initial changes for RO bandwidth throttler

  • Loading branch information...
Chinmay Soman
Chinmay Soman committed Nov 30, 2011
1 parent ad05eaf commit bd927ff68693cbaeb8d8f64c6673aa2722355ecc
@@ -45,7 +45,7 @@
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
import voldemort.utils.ByteUtils;
-import voldemort.utils.EventThrottler;
+import voldemort.utils.DynamicEventThrottler;
import voldemort.utils.JmxUtils;
import voldemort.utils.Props;
import voldemort.utils.Time;
@@ -66,7 +66,8 @@
private final int bufferSize;
private static final AtomicInteger copyCount = new AtomicInteger(0);
private AsyncOperationStatus status;
- private EventThrottler throttler = null;
+ private DynamicEventThrottler throttler = null;
+ private long numJobs = 0;
public HdfsFetcher(Props props) {
this(props.containsKey("fetcher.max.bytes.per.sec") ? props.getBytes("fetcher.max.bytes.per.sec")
@@ -86,13 +87,17 @@ public HdfsFetcher() {
public HdfsFetcher(Long maxBytesPerSecond, Long reportingIntervalBytes, int bufferSize) {
this.maxBytesPerSecond = maxBytesPerSecond;
if(this.maxBytesPerSecond != null)
- this.throttler = new EventThrottler(this.maxBytesPerSecond);
+ this.throttler = new DynamicEventThrottler(this.maxBytesPerSecond);
this.reportingIntervalBytes = Utils.notNull(reportingIntervalBytes);
this.bufferSize = bufferSize;
this.status = null;
}
public File fetch(String sourceFileUrl, String destinationFile) throws IOException {
+ synchronized(this) {
+ numJobs++;
+ throttler.updateRate(this.maxBytesPerSecond / numJobs);
+ }
Path path = new Path(sourceFileUrl);
Configuration config = new Configuration();
config.setInt("io.socket.receive.buffer", bufferSize);
@@ -34,6 +34,10 @@ public EventThrottler(long ratesPerSecond) {
this(SystemTime.INSTANCE, ratesPerSecond, DEFAULT_CHECK_INTERVAL_MS);
}
+ public long getRate() {
+ return this.ratesPerSecond;
+ }
+
public EventThrottler(Time time, long ratePerSecond, long intervalMs) {
this.time = time;
this.intervalMs = intervalMs;
@@ -43,22 +47,24 @@ public EventThrottler(Time time, long ratePerSecond, long intervalMs) {
}
public synchronized void maybeThrottle(int eventsSeen) {
+ long rateLimit = getRate();
+ System.err.println("Rate = " + rateLimit);
eventsSeenInLastInterval += eventsSeen;
long now = time.getNanoseconds();
long ellapsedNs = now - startTime;
// if we have completed an interval AND we have seen some events, maybe
// we should take a little nap
if(ellapsedNs > intervalMs * Time.NS_PER_MS && eventsSeenInLastInterval > 0) {
long eventsPerSec = (eventsSeenInLastInterval * Time.NS_PER_SECOND) / ellapsedNs;
- if(eventsPerSec > ratesPerSecond) {
+ if(eventsPerSec > rateLimit) {
// solve for the amount of time to sleep to make us hit the
// correct i/o rate
- double maxEventsPerMs = ratesPerSecond / (double) Time.MS_PER_SECOND;
+ double maxEventsPerMs = rateLimit / (double) Time.MS_PER_SECOND;
long ellapsedMs = ellapsedNs / Time.NS_PER_MS;
long sleepTime = Math.round(eventsSeenInLastInterval / maxEventsPerMs - ellapsedMs);
if(logger.isDebugEnabled())
logger.debug("Natural rate is " + eventsPerSec
- + " events/sec max allowed rate is " + ratesPerSecond
+ + " events/sec max allowed rate is " + rateLimit
+ " events/sec, sleeping for " + sleepTime + " ms to compensate.");
if(sleepTime > 0) {
try {

0 comments on commit bd927ff

Please sign in to comment.