Skip to content

Commit

Permalink
Revert "[SPARK-22908] Add kafka source and sink for continuous proces…
Browse files Browse the repository at this point in the history
…sing."

This reverts commit 6f7aaed.
  • Loading branch information
sameeragarwal committed Jan 12, 2018
1 parent 5427739 commit 55dbfbc
Show file tree
Hide file tree
Showing 21 changed files with 383 additions and 1,531 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,10 @@ private[kafka010] class KafkaOffsetReader(
* Resolves the specific offsets based on Kafka seek positions.
* This method resolves offset value -1 to the latest and -2 to the
* earliest Kafka seek position.
*
* @param partitionOffsets the specific offsets to resolve
* @param reportDataLoss callback to either report or log data loss depending on setting
*/
def fetchSpecificOffsets(
partitionOffsets: Map[TopicPartition, Long],
reportDataLoss: String => Unit): KafkaSourceOffset = {
val fetched = runUninterruptibly {
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] =
runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
Expand All @@ -149,19 +145,6 @@ private[kafka010] class KafkaOffsetReader(
}
}

partitionOffsets.foreach {
case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
off != KafkaOffsetRangeLimit.EARLIEST =>
if (fetched(tp) != off) {
reportDataLoss(
s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}")
}
case _ =>
// no real way to check that beginning or end is reasonable
}
KafkaSourceOffset(fetched)
}

/**
* Fetch the earliest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
Expand Down
Loading

0 comments on commit 55dbfbc

Please sign in to comment.