Skip to content

Commit

Permalink
address @zsxwing's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
uncleGen committed Mar 11, 2017
1 parent a37d11d commit 50ef0e1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
.map { k => k.drop(6).toString -> parameters(k) }
.toMap

val startingStreamOffsets =
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim) match {
case Some(offset) if offset.toLowerCase == "latest" => LatestOffsetRangeLimit
case Some(offset) if offset.toLowerCase == "earliest" => EarliestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => LatestOffsetRangeLimit
}
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
Expand Down Expand Up @@ -128,18 +123,18 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
.map { k => k.drop(6).toString -> parameters(k) }
.toMap

val startingRelationOffsets =
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim) match {
case Some(offset) if offset.toLowerCase == "earliest" => EarliestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => EarliestOffsetRangeLimit
val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match {
case earliest @ EarliestOffsetRangeLimit => earliest
case specific @ SpecificOffsetRangeLimit(_) => specific
case _ => EarliestOffsetRangeLimit
}

val endingRelationOffsets =
caseInsensitiveParams.get(ENDING_OFFSETS_OPTION_KEY).map(_.trim) match {
case Some(offset) if offset.toLowerCase == "latest" => LatestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => LatestOffsetRangeLimit
val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match {
case latest @ LatestOffsetRangeLimit => latest
case specific @ SpecificOffsetRangeLimit(_) => specific
case _ => LatestOffsetRangeLimit
}

val kafkaOffsetReader = new KafkaOffsetReader(
Expand Down Expand Up @@ -388,34 +383,34 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister

private def validateBatchOptions(caseInsensitiveParams: Map[String, String]) = {
// Batch specific options
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim) match {
case Some(offset) if offset.toLowerCase == "earliest" => // good to go
case Some(offset) if offset.toLowerCase == "latest" =>
KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match {
case EarliestOffsetRangeLimit => // good to go
case LatestOffsetRangeLimit =>
throw new IllegalArgumentException("starting offset can't be latest " +
"for batch queries on Kafka")
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
.partitionOffsets.foreach {
case specific: SpecificOffsetRangeLimit =>
specific.partitionOffsets.foreach {
case (tp, off) if off == KafkaOffsetRangeLimit.LATEST =>
throw new IllegalArgumentException(s"startingOffsets for $tp can't " +
"be latest for batch queries on Kafka")
case _ => // ignore
}
case _ => // default to earliest
}

caseInsensitiveParams.get(ENDING_OFFSETS_OPTION_KEY).map(_.trim) match {
case Some(offset) if offset.toLowerCase == "earliest" =>
KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match {
case EarliestOffsetRangeLimit =>
throw new IllegalArgumentException("ending offset can't be earliest " +
"for batch queries on Kafka")
case Some(offset) if offset.toLowerCase == "latest" => // good to go
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
.partitionOffsets.foreach {
case LatestOffsetRangeLimit => // good to go
case specific: SpecificOffsetRangeLimit =>
specific.partitionOffsets.foreach {
case (tp, off) if off == KafkaOffsetRangeLimit.EARLIEST =>
throw new IllegalArgumentException(s"ending offset for $tp can't be " +
"earliest for batch queries on Kafka")
case _ => // ignore
}
case _ => // default to latest
}

validateGeneralOptions(caseInsensitiveParams)
Expand Down Expand Up @@ -450,10 +445,22 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister

private[kafka010] object KafkaSourceProvider {
private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign")
private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
val TOPIC_OPTION_KEY = "topic"

private val deserClassName = classOf[ByteArrayDeserializer].getName

def getKafkaOffsetRangeLimit(
params: Map[String, String],
offsetOptionKey: String,
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit = {
params.get(offsetOptionKey).map(_.trim) match {
case Some(offset) if offset.toLowerCase == "latest" => LatestOffsetRangeLimit
case Some(offset) if offset.toLowerCase == "earliest" => EarliestOffsetRangeLimit
case Some(json) => SpecificOffsetRangeLimit(JsonUtils.partitionOffsets(json))
case None => defaultOffsets
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -364,16 +365,6 @@ class KafkaSourceSuite extends KafkaSourceTest {
"failOnDataLoss" -> failOnDataLoss.toString)
}

test(s"assign from specific offsets (topic with uppercase characters) " +
s"(failOnDataLoss: $failOnDataLoss)") {
val topic = newTopic().toUpperCase
testFromSpecificOffsets(
topic,
failOnDataLoss = failOnDataLoss,
"assign" -> assignString(topic, 0 to 4),
"failOnDataLoss" -> failOnDataLoss.toString)
}

test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") {
val topic = newTopic()
testFromLatestOffsets(
Expand All @@ -397,12 +388,6 @@ class KafkaSourceSuite extends KafkaSourceTest {
testFromSpecificOffsets(topic, failOnDataLoss = failOnDataLoss, "subscribe" -> topic)
}

test(s"subscribing topic (with uppercase characters) by name from specific offsets " +
s"(failOnDataLoss: $failOnDataLoss)") {
val topic = newTopic().toUpperCase
testFromSpecificOffsets(topic, failOnDataLoss = failOnDataLoss, "subscribe" -> topic)
}

test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-suffix"
Expand Down Expand Up @@ -431,16 +416,6 @@ class KafkaSourceSuite extends KafkaSourceTest {
failOnDataLoss = failOnDataLoss,
"subscribePattern" -> s"$topicPrefix-.*")
}

test(s"subscribing topic (with uppercase characters) by pattern from specific offsets " +
s"(failOnDataLoss: $failOnDataLoss)") {
val topicPrefix = newTopic().toUpperCase
val topic = topicPrefix + "-suffix"
testFromSpecificOffsets(
topic,
failOnDataLoss = failOnDataLoss,
"subscribePattern" -> s"$topicPrefix-.*")
}
}

test("subscribing topic by pattern with topic deletions") {
Expand Down Expand Up @@ -632,6 +607,36 @@ class KafkaSourceSuite extends KafkaSourceTest {
assert(query.exception.isEmpty)
}

for((optionKey, optionValue, answer) <- Seq(
(STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit),
(ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit),
(STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""",
SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) {
test(s"test offsets containing uppercase characters (${answer.getClass.getSimpleName})") {
val offset = getKafkaOffsetRangeLimit(
Map(optionKey -> optionValue),
optionKey,
answer
)

assert(offset == answer)
}
}

for((optionKey, answer) <- Seq(
(STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit),
(ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) {
test(s"test offsets with default (${answer.getClass.getSimpleName})") {
val offset = getKafkaOffsetRangeLimit(
Map.empty,
optionKey,
answer
)

assert(offset == answer)
}
}

private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"

private def assignString(topic: String, partitions: Iterable[Int]): String = {
Expand Down

0 comments on commit 50ef0e1

Please sign in to comment.