Skip to content
Permalink
Browse files

[SPARK-23317][SQL] rename ContinuousReader.setOffset to setStartOffset

## What changes were proposed in this pull request?

In the document of `ContinuousReader.setOffset`, we say this method is used to specify the start offset. We also have a `ContinuousReader.getStartOffset` to get the value back. I think it makes more sense to rename `ContinuousReader.setOffset` to `setStartOffset`.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20486 from cloud-fan/rename.

(cherry picked from commit fe73cb4)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information...
cloud-fan authored and gatorsmile committed Feb 3, 2018
1 parent dcd0af4 commit b614c083a4875c874180a93b08ea5031fa90cfec
@@ -71,7 +71,7 @@ class KafkaContinuousReader(
override def readSchema: StructType = KafkaOffsetReader.kafkaSchema

private var offset: Offset = _
override def setOffset(start: ju.Optional[Offset]): Unit = {
override def setStartOffset(start: ju.Optional[Offset]): Unit = {
offset = start.orElse {
val offsets = initialOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
@@ -51,12 +51,12 @@
* start from the first record after the provided offset, or from an implementation-defined
* inferred starting point if no offset is provided.
*/
void setOffset(Optional<Offset> start);
void setStartOffset(Optional<Offset> start);

/**
* Return the specified or inferred start offset for this reader.
*
* @throws IllegalStateException if setOffset has not been called
* @throws IllegalStateException if setStartOffset has not been called
*/
Offset getStartOffset();

@@ -181,7 +181,7 @@ class ContinuousExecution(

val loggedOffset = offsets.offsets(0)
val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull))
new StreamingDataSourceV2Relation(newOutput, reader)
}

@@ -61,7 +61,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)

private var offset: Offset = _

override def setOffset(offset: java.util.Optional[Offset]): Unit = {
override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
this.offset = offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, creationTime))
}

@@ -160,7 +160,7 @@ class RateSourceV2Suite extends StreamTest {
test("continuous data") {
val reader = new RateStreamContinuousReader(
new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
reader.setOffset(Optional.empty())
reader.setStartOffset(Optional.empty())
val tasks = reader.createDataReaderFactories()
assert(tasks.size == 2)

@@ -43,7 +43,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader {
def readSchema(): StructType = StructType(Seq())
def stop(): Unit = {}
def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
def setOffset(start: Optional[Offset]): Unit = {}
def setStartOffset(start: Optional[Offset]): Unit = {}

def createDataReaderFactories(): java.util.ArrayList[DataReaderFactory[Row]] = {
throw new IllegalStateException("fake source - cannot actually read")

0 comments on commit b614c08

Please sign in to comment.
You can’t perform that action at this time.