Skip to content

Commit

Permalink
keep OffsetRange creation method for backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin committed Aug 2, 2018
1 parent 70ecd38 commit 29c5406
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,8 @@ private[spark] class DirectKafkaInputDStream[K, V](
}
} else {
offsets.map { case (tp, untilOffset) =>
val size = untilOffset - currentOffsets(tp)
val offsetFrom = currentOffsets(tp)
OffsetRange(tp.topic(), tp.partition, offsetFrom, untilOffset, size)
OffsetRange(tp.topic(), tp.partition, offsetFrom, untilOffset)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,25 +132,43 @@ object OffsetRange {
recordNumber: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset, recordNumber)

def create(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
recordNumber: Long): OffsetRange =
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset, untilOffset - fromOffset)

def create(topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
recordNumber: Long): OffsetRange =
new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset,
recordNumber)

def create(topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long): OffsetRange =
new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset,
recordNumber)
untilOffset - fromOffset)

def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long,
recordNumber: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset, recordNumber)

def apply(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
recordNumber: Long): OffsetRange =
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
recordNumber: Long): OffsetRange =
new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset,
recordNumber)

def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset, untilOffset - fromOffset)

def apply(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long): OffsetRange =
new OffsetRange(topicPartition.topic, topicPartition.partition, fromOffset, untilOffset,
recordNumber)
untilOffset - fromOffset)

/** this is to avoid ClassNotFoundException during checkpoint restore */
private[kafka010]
Expand Down

0 comments on commit 29c5406

Please sign in to comment.