Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding BatchSizeManager to Castle #3

Merged
merged 6 commits into from
May 5, 2017
Merged

Conversation

shubhrojroy
Copy link
Contributor

Adding BatchSizeManager to Castle to control bytes read during fetch.

@@ -75,4 +80,7 @@ object CommitterConfig {
val DefaultParallelismFactorByTopic = Map.empty[String, Int]
val DefaultCorruptMessagePolicy = CorruptMessagePolicy.skip
val DefaultUseKafkaMetadataManager = true
val DefaultUseBatchSizeManager = false
val DefaultSamplingSlots = 20
val DefaultSamplingInterval = 60000 // milliseconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be of type Duration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

/**
* Queue with a fixed upper bound on capacity
*/
class BoundedQueue[T](capacity: Int) extends mutable.Queue[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class needs its own unit tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


object BatchSizeManager {
val DiscountFactor = 0.9
val FullBufferMaxCount = 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DiscountFactor & FullBufferMaxCount should be passed in via config, so you can remove this companion object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

// Update number of times we have seen consecutive full buffer reads
consecutiveFullBuffers += 1
}
else if (consecutiveFullBuffers > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for else if check here, just else { consecutiveFullBuffers = 0 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

}

if (consecutiveFullBuffers > FullBufferMaxCount) {
// We are mostly in catchup mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// We are most likely in catchup mode

@@ -118,7 +123,11 @@ trait CommittingBatch extends CommitterActorBase
// We commit the consumer offset here without waiting on the result, we are assuming it will succeed most of the time
commitConsumerOffset(batch.nextOffset, metadata)

becomeFetchingData(OffsetAndMetadata(batch.nextOffset, metadata))
// If BatchSizeManager is enabled then goto Idling state with a specific delay
if(committerConfig.useBatchSizeManager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

due to Option[BatchSizeManager] this becomes a match case statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -153,6 +162,10 @@ trait CommittingBatch extends CommitterActorBase
// We fetch the latest offset in the topic here so we can compute the offset lag
sendRequestToRouter(FetchOffset(LatestOffset, topicAndPartition))

// Track bytes read from Kafka if BatchSizeManager is enabled
if(committerConfig.useBatchSizeManager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

due to Option[BatchSizeManager] you can make this into a batchSizeManager.foreach(_.track(batch.sizeInBytes, System.currentTimeMillis())) call instead of an if statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat

batchSizeManager.track(bytesRead, ts2)

// Validate
val expectedResult = 100 * 0.9 / (20 / (ts2 - ts1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with 3 samples of 10 bytes read, means you have read 300 bytes in 300ms, your rate here is 1 byte / ms, if your buffer size is 1200, then expected should be (1200 / 1) * 0.9

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I get what you mean here.

// Setup
val samplingSlots = 3
val samplingInterval = 100 // milliseconds
val bufferSize = 100 //bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's very confusing in a test to make buffer size equal to sampling interval, can you change it to any other number that looks different like 1200

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

// Execute
val ts1 = System.currentTimeMillis()
batchSizeManager.track(bytesRead, ts1)
Thread.sleep(samplingInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests will be super flaky, you cannot Thread.sleep() here. mock the call to System.currentTimeMillis(), or... my personal favorite is just create your own method for getting time in your BatchSizeManager and just mock that method with the value you want it to return

consecutiveFullBuffers = 0
}

if (samples.isEmpty || timestamp - samples.last.timestamp > samplingInterval) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to fill the samples as quickly as possible before we start dropping them due to the sampling interval that is desired, so this should be: if (!(samples.isFull) || (timestamp - samples.last.timestamp) >= samplingInterval)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@shubhrojroy
Copy link
Contributor Author

Made changes to address comments

Copy link
Contributor

@denisgrenader denisgrenader left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more minor comments, but then I think we are ready to merge once dev testing looks good.

if (samples.isEmpty)
samples.enqueue(ReadSample(bytesRead, timestamp))
else
samples.enqueue(ReadSample(samples.last.bytesRead + bytesRead, timestamp))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can replace this if statement with:
val sample = ReadSample(samples.lastOption.getOrElse(zeroByteSample).bytesRead + bytesRead, timestamp)
samples.enqueue(sample)

in the constructor declare: val zeroByteSample = ReadSample(0, 0)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

val discount = Math.pow(committerConfig.discountFactor, consecutiveFullBuffers + 1)
val computedDelay = Math.min(committerConfig.maxWaitTime.getMillis, (delay * discount).toLong)

log.info(s"$committerActorId: Idling for ${computedDelay / 1000} seconds after calculating a rate of ${"%.3f".format(rate)} KiB/sec with discountFactor of ${"%.3f".format(discount)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rate is a bit wonky since the original is actually bytes/millisecond which is base 10 not base 2. Let's show it as MiB:
log.info(s"$committerActorId delaying next fetch by ${computedDelay / 1000} seconds after calculating a rate of ${"%.3f".format(rate / 1048.576)} MiB/sec with discountFactor of ${"%.3f".format(discount)}")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if(delay.getMillis > 0)
becomeIdling(OffsetAndMetadata(batch.nextOffset, metadata), delay)
else
becomeFetchingData(OffsetAndMetadata(batch.nextOffset, metadata))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a log line here
log.info("$committerActorId fetching immediately because BatchSizeManager returned a delay of 0")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -75,4 +85,10 @@ object CommitterConfig {
val DefaultParallelismFactorByTopic = Map.empty[String, Int]
val DefaultCorruptMessagePolicy = CorruptMessagePolicy.skip
val DefaultUseKafkaMetadataManager = true
val DefaultUseBatchSizeManager = false
val DefaultSamplingSlots = 20
val DefaultSamplingInterval = new Duration(60000) // 1 second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment here should be // 60 seconds

batchSizeManagerOption match {
case Some(batchSizeManager) =>
val delay = batchSizeManager.getDelay(committerActorId)
if(delay.getMillis > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does joda time have a typed 0 constant? Can this be changed to something like
if (delay > Duration.ZERO) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes there is but it does not implement comparable I think so '>' doesn't work.

@shubhrojroy
Copy link
Contributor Author

Added BatchSizeManager component to Castle core to control size of batches read from Kafka for a specific committer. Add parameters to control this to Committer config as well.

@shubhrojroy shubhrojroy merged commit a999b43 into master May 5, 2017
@shubhrojroy shubhrojroy deleted the BatchSizeManager branch May 5, 2017 20:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants