diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala index 3c5d63876d4d3..4017fdbcaf95e 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala @@ -34,7 +34,7 @@ abstract class PerPartitionConfig extends Serializable { * from each Kafka partition. */ def maxRatePerPartition(topicPartition: TopicPartition): Long - def minRatePerPartition(topicPartition: TopicPartition): Long + def minRatePerPartition(topicPartition: TopicPartition): Long = 1 } /** @@ -46,5 +46,5 @@ private class DefaultPerPartitionConfig(conf: SparkConf) val minRate = conf.getLong("spark.streaming.kafka.minRatePerPartition", 1) def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate - def minRatePerPartition(topicPartition: TopicPartition): Long = minRate + override def minRatePerPartition(topicPartition: TopicPartition): Long = minRate } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 7ed4ed3ea80ab..661b67a8ab68a 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -543,7 +543,6 @@ class DirectKafkaStreamSuite } else { 100 } - def minRatePerPartition(tp: TopicPartition) = 1 }) val kafkaStream = getDirectKafkaStream(topic, rateController, ppc)