Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Aug 6, 2019
1 parent d2832ce commit a7d0c55
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester {
}

test("micro-batch mode - options should be handled as case-insensitive") {
def verifyFieldsInMicroBatchStream(
def verifyFieldsInMicroBatchScan(
options: CaseInsensitiveStringMap,
expectedPollTimeoutMs: Long,
expectedMaxOffsetsPerTrigger: Option[Long]): Unit = {
// KafkaMicroBatchStream reads Spark conf from SparkEnv for default value
// hence we set mock SparkEnv here before creating KafkaMicroBatchStream
// KafkaMicroBatchScan reads Spark conf from SparkEnv for default value
// hence we set mock SparkEnv here before creating KafkaMicroBatchScan
val sparkEnv = mock(classOf[SparkEnv])
when(sparkEnv.conf).thenReturn(new SparkConf())
SparkEnv.set(sparkEnv)
Expand All @@ -61,11 +61,11 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester {
buildCaseInsensitiveStringMapForUpperAndLowerKey(
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT -> expectedValue.toString,
KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER -> expectedValue.toString)
.foreach(verifyFieldsInMicroBatchStream(_, expectedValue, Some(expectedValue)))
.foreach(verifyFieldsInMicroBatchScan(_, expectedValue, Some(expectedValue)))
}

test("SPARK-28142 - continuous mode - options should be handled as case-insensitive") {
def verifyFieldsInContinuousStream(
def verifyFieldsInContinuousScan(
options: CaseInsensitiveStringMap,
expectedPollTimeoutMs: Long): Unit = {
val builder = getKafkaDataSourceScanBuilder(options)
Expand All @@ -77,7 +77,7 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester {
val expectedValue = 1000
buildCaseInsensitiveStringMapForUpperAndLowerKey(
KafkaSourceProvider.CONSUMER_POLL_TIMEOUT -> expectedValue.toString)
.foreach(verifyFieldsInContinuousStream(_, expectedValue))
.foreach(verifyFieldsInContinuousScan(_, expectedValue))
}

private def buildCaseInsensitiveStringMapForUpperAndLowerKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.types.StructType
* monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark
* will regularly query each [[Source]] to see if any more data is available.
*
* Note that, we extends `SparkDataStream` here, to make the v1 streaming source API be compatible
* Note that, we extends [[StreamingScan]] here, to make the v1 streaming source API be compatible
* with data source v2.
*/
trait Source extends StreamingScan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.util.RpcUtils

/**
* A [[ContinuousScan]] that reads text lines through a TCP socket, designed only for tutorials
* and debugging. This ContinuousStream will *not* work in production applications due to
* and debugging. This [[ContinuousScan]] will *not* work in production applications due to
* multiple reasons, including no support for fault recovery.
*
* The driver maintains a socket connection to the host-port, keeps the received messages in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.unsafe.types.UTF8String

/**
* A [[MicroBatchScan]] that reads text lines through a TCP socket, designed only for tutorials
* and debugging. This MicroBatchReadSupport will *not* work in production applications due to
* and debugging. This [[MicroBatchScan]] will *not* work in production applications due to
* multiple reasons, including no support for fault recovery.
*/
class TextSocketMicroBatchScan(
Expand Down

0 comments on commit a7d0c55

Please sign in to comment.