diff --git a/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala index 5a9fd18c338c3..6ab698d8b61f1 100644 --- a/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala @@ -132,12 +132,14 @@ class KafkaRDD[ null.asInstanceOf[R] } else { val item = iter.next - if (item.offset > part.untilOffset) { + if (item.offset >= part.untilOffset) { finished = true + null.asInstanceOf[R] + } else { + requestOffset = item.nextOffset + messageHandler(new MessageAndMetadata( + part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder)) } - requestOffset = item.nextOffset - messageHandler(new MessageAndMetadata( - part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder)) } } } diff --git a/external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala index 284c9d9dc996d..f8a603c9fa965 100644 --- a/external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala @@ -55,13 +55,21 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { val rdd = getRdd(kc, Set(topic)) assert(rdd.isDefined) - assert(rdd.get.countByValue.size === sent.size) + assert(rdd.get.count === sent.values.sum) kc.setConsumerOffsets(kafkaParams("group.id"), rdd.get.untilOffsets) val rdd2 = getRdd(kc, Set(topic)) + val sent2 = Map("d" -> 1) + produceAndSendMessage(topic, sent2) assert(rdd2.isDefined) assert(rdd2.get.count === 0) + + val rdd3 = getRdd(kc, Set(topic)) + produceAndSendMessage(topic, Map("extra" -> 22)) + assert(rdd3.isDefined) + assert(rdd3.get.count === sent2.values.sum) + } private def getRdd(kc: KafkaCluster, topics: Set[String]) = { @@ -73,7 +81,7 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption } yield { new KafkaRDD[String, String, StringDecoder, StringDecoder, String]( - sc, kc.kafkaParams, from, until, mmd => mmd.message) + sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}") } } }