Skip to content

Commit

Permalink
[SPARK-18580] finish merge
Browse files Browse the repository at this point in the history
  • Loading branch information
akonopko committed Mar 21, 2018
1 parent eaac27c commit d11e807
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 0 deletions.
Expand Up @@ -664,6 +664,54 @@ class DirectKafkaStreamSuite
kafkaStream.stop()
}

test("maxMessagesPerPartition with zero offset and rate equal to one") {
val topic = "backpressure"
val kafkaParams = getKafkaParams()
val batchIntervalMilliseconds = 60000
val sparkConf = new SparkConf()
// Safe, even with streaming, because we're using the direct API.
// Using 1 core is useful to make the test more predictable.
.setMaster("local[1]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.kafka.maxRatePerPartition", "100")

// Setup the streaming context
ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
val estimateRate = 1L
val fromOffsets = Map(
new TopicPartition(topic, 0) -> 0L,
new TopicPartition(topic, 1) -> 0L,
new TopicPartition(topic, 2) -> 0L,
new TopicPartition(topic, 3) -> 0L
)
val kafkaStream = withClue("Error creating direct stream") {
new DirectKafkaInputDStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala),
new DefaultPerPartitionConfig(sparkConf)
) {
currentOffsets = fromOffsets
override val rateController = Some(new ConstantRateController(id, null, estimateRate))
}
}

val offsets = Map[TopicPartition, Long](
new TopicPartition(topic, 0) -> 0,
new TopicPartition(topic, 1) -> 100L,
new TopicPartition(topic, 2) -> 200L,
new TopicPartition(topic, 3) -> 300L
)
val result = kafkaStream.maxMessagesPerPartition(offsets)
val expected = Map(
new TopicPartition(topic, 0) -> 1L,
new TopicPartition(topic, 1) -> 10L,
new TopicPartition(topic, 2) -> 20L,
new TopicPartition(topic, 3) -> 30L
)
assert(result.contains(expected), s"Number of messages per partition must be at least 1")
}

/** Get the generated offset ranges from the DirectKafkaStream */
private def getOffsetRanges[K, V](
kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, Array[OffsetRange])] = {
Expand Down
Expand Up @@ -509,6 +509,57 @@ class DirectKafkaStreamSuite
kafkaStream.stop()
}

test("maxMessagesPerPartition with zero offset and rate equal to one") {
val topic = "backpressure"
val kafkaParams = Map(
"metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"auto.offset.reset" -> "smallest"
)

val batchIntervalMilliseconds = 60000
val sparkConf = new SparkConf()
// Safe, even with streaming, because we're using the direct API.
// Using 1 core is useful to make the test more predictable.
.setMaster("local[1]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.kafka.maxRatePerPartition", "100")

// Setup the streaming context
ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
val estimatedRate = 1L
val kafkaStream = withClue("Error creating direct stream") {
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
val fromOffsets = Map(
TopicAndPartition(topic, 0) -> 0L,
TopicAndPartition(topic, 1) -> 0L,
TopicAndPartition(topic, 2) -> 0L,
TopicAndPartition(topic, 3) -> 0L
)
new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, fromOffsets, messageHandler) {
override protected[streaming] val rateController =
Some(new DirectKafkaRateController(id, null) {
override def getLatestRate() = estimatedRate
})
}
}

val offsets = Map(
TopicAndPartition(topic, 0) -> 0L,
TopicAndPartition(topic, 1) -> 100L,
TopicAndPartition(topic, 2) -> 200L,
TopicAndPartition(topic, 3) -> 300L
)
val result = kafkaStream.maxMessagesPerPartition(offsets)
val expected = Map(
TopicAndPartition(topic, 0) -> 1L,
TopicAndPartition(topic, 1) -> 10L,
TopicAndPartition(topic, 2) -> 20L,
TopicAndPartition(topic, 3) -> 30L
)
assert(result.contains(expected), s"Number of messages per partition must be at least 1")
}

/** Get the generated offset ranges from the DirectKafkaStream */
private def getOffsetRanges[K, V](
kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
Expand Down

0 comments on commit d11e807

Please sign in to comment.