Skip to content

Commit

Permalink
[SPARK-9786][Streaming][Kafka] fix backpressure so it works with defa…
Browse files Browse the repository at this point in the history
…ult maxRatePerPartition setting of 0
  • Loading branch information
koeninger committed Aug 25, 2015
1 parent a0c0aae commit 2664626
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ class DirectKafkaInputDStream[

val effectiveRateLimitPerPartition = estimatedRateLimit
.filter(_ > 0)
.map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions)))
.getOrElse(maxRateLimitPerPartition)
.map { limit =>
if (maxRateLimitPerPartition > 0) {
Math.min(maxRateLimitPerPartition, (limit / numPartitions))
} else {
limit / numPartitions
}
}.getOrElse(maxRateLimitPerPartition)

if (effectiveRateLimitPerPartition > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Expand Down

0 comments on commit 2664626

Please sign in to comment.