From 59cef98868586a4f039b04e74c32c94eaff965c0 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 28 Feb 2018 15:29:57 +0800 Subject: [PATCH 1/3] [SPARK-23533][SS] Add support for changing ContinousDataReader's startOffset --- .../sql/kafka010/KafkaContinuousReader.scala | 10 ++++- .../reader/ContinuousDataReaderFactory.java | 38 +++++++++++++++++++ .../ContinuousRateStreamSource.scala | 14 ++++++- 3 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index ecd1170321f3f..646406ec36bf7 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -164,7 +164,15 @@ case class KafkaContinuousDataReaderFactory( startOffset: Long, kafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, - failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] { + failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[UnsafeRow] { + + override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = { + val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset] + assert(kafkaOffset.topicPartition == topicPartition) + new KafkaContinuousDataReader( + topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss) + } + override def createDataReader(): KafkaContinuousDataReader = { new KafkaContinuousDataReader( topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java new file mode 100644 index 0000000000000..1dacef42e9e3a --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.reader; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; + +/** + * A mix-in interface for {@link DataReaderFactory}. Continuous data reader factories can + * implement this interface to provide creating {@link DataReader} with particular offset. + */ +@InterfaceStability.Evolving +public interface ContinuousDataReaderFactory extends DataReaderFactory { + /** + * Create a DataReader with particular offset as its startOffset. + * + * @param offset offset want to set as the DataReader's startOffset. + */ + default DataReader createDataReaderWithOffset(PartitionOffset offset) { + throw new IllegalStateException( + "createDataReaderFactories not supported by default within SupportsScanUnsafeRow"); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala index b63d8d3e20650..2cde825d3ef04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -106,7 +106,19 @@ case class RateStreamContinuousDataReaderFactory( partitionIndex: Int, increment: Long, rowsPerSecond: Double) - extends DataReaderFactory[Row] { + extends ContinuousDataReaderFactory[Row] { + + override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[Row] = { + val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset] + assert(rateStreamOffset.partition == partitionIndex) + new RateStreamContinuousDataReader( + rateStreamOffset.currentValue, + rateStreamOffset.currentTimeMs, + partitionIndex, + increment, + rowsPerSecond) + } + override def createDataReader(): DataReader[Row] = new RateStreamContinuousDataReader( startValue, startTimeMs, partitionIndex, increment, rowsPerSecond) From 4bf17a738de1b705ee673b8e889394ccbe972f47 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 1 Mar 2018 10:51:07 +0800 Subject: [PATCH 2/3] fix typo --- .../sql/sources/v2/reader/ContinuousDataReaderFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java index 1dacef42e9e3a..e79002a2199b9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java @@ -33,6 +33,6 @@ public interface ContinuousDataReaderFactory extends DataReaderFactory { */ default DataReader createDataReaderWithOffset(PartitionOffset offset) { throw new IllegalStateException( - "createDataReaderFactories not supported by default within SupportsScanUnsafeRow"); + "createDataReaderWithOffset not supported by default within ContinuousDataReaderFactory"); } } From 992e2c1de84b9e82875f47ecc21aad2a299038a7 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 15 Mar 2018 11:17:34 +0800 Subject: [PATCH 3/3] address comments --- .../apache/spark/sql/kafka010/KafkaContinuousReader.scala | 3 ++- .../sql/sources/v2/reader/ContinuousDataReaderFactory.java | 5 +---- .../streaming/continuous/ContinuousRateStreamSource.scala | 3 ++- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index 646406ec36bf7..6e56b0a72d671 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -168,7 +168,8 @@ case class KafkaContinuousDataReaderFactory( override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = { val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset] - assert(kafkaOffset.topicPartition == topicPartition) + require(kafkaOffset.topicPartition == topicPartition, + s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}") new KafkaContinuousDataReader( topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss) } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java index e79002a2199b9..a61697649c43e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java @@ -31,8 +31,5 @@ public interface ContinuousDataReaderFactory extends DataReaderFactory { * * @param offset offset want to set as the DataReader's startOffset. */ - default DataReader createDataReaderWithOffset(PartitionOffset offset) { - throw new IllegalStateException( - "createDataReaderWithOffset not supported by default within ContinuousDataReaderFactory"); - } + DataReader createDataReaderWithOffset(PartitionOffset offset); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala index 2cde825d3ef04..20d90069163a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -110,7 +110,8 @@ case class RateStreamContinuousDataReaderFactory( override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[Row] = { val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset] - assert(rateStreamOffset.partition == partitionIndex) + require(rateStreamOffset.partition == partitionIndex, + s"Expected partitionIndex: $partitionIndex, but got: ${rateStreamOffset.partition}") new RateStreamContinuousDataReader( rateStreamOffset.currentValue, rateStreamOffset.currentTimeMs,