Skip to content

Commit

Permalink
[SPARK-24073][SQL] Rename DataReaderFactory to InputPartition.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Renames:
* `DataReaderFactory` to `InputPartition`
* `DataReader` to `InputPartitionReader`
* `createDataReaderFactories` to `planInputPartitions`
* `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions`
* `createBatchDataReaderFactories` to `planBatchInputPartitions`

This fixes the changes in SPARK-23219, which renamed ReadTask to
DataReaderFactory. The intent of that change was to make the read and
write API match (write side uses DataWriterFactory), but the underlying
problem is that the two classes are not equivalent.

ReadTask/DataReader function as Iterable/Iterator. One InputPartition is
a specific partition of the data to be read, in contrast to
DataWriterFactory where the same factory instance is used in all write
tasks. InputPartition's purpose is to manage the lifecycle of the
associated reader, which is now called InputPartitionReader, with an
explicit create operation to mirror the close operation. This was no
longer clear from the API because DataReaderFactory appeared to be more
generic than it is and it isn't clear why a set of them is produced for
a read.

## How was this patch tested?

Existing tests, which have been updated to use the new name.

Author: Ryan Blue <blue@apache.org>

Closes #21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename.
  • Loading branch information
rdblue authored and gatorsmile committed May 10, 2018
1 parent 9341c95 commit 62d0139
Show file tree
Hide file tree
Showing 39 changed files with 272 additions and 263 deletions.
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -86,7 +86,7 @@ class KafkaContinuousReader(
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
}

override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
import scala.collection.JavaConverters._

val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
Expand All @@ -108,7 +108,7 @@ class KafkaContinuousReader(
case (topicPartition, start) =>
KafkaContinuousDataReaderFactory(
topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
.asInstanceOf[DataReaderFactory[UnsafeRow]]
.asInstanceOf[InputPartition[UnsafeRow]]
}.asJava
}

Expand Down Expand Up @@ -161,18 +161,18 @@ case class KafkaContinuousDataReaderFactory(
startOffset: Long,
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[UnsafeRow] {
failOnDataLoss: Boolean) extends ContinuousInputPartition[UnsafeRow] {

override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = {
override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[UnsafeRow] = {
val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
require(kafkaOffset.topicPartition == topicPartition,
s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}")
new KafkaContinuousDataReader(
new KafkaContinuousInputPartitionReader(
topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
}

override def createDataReader(): KafkaContinuousDataReader = {
new KafkaContinuousDataReader(
override def createPartitionReader(): KafkaContinuousInputPartitionReader = {
new KafkaContinuousInputPartitionReader(
topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
}
}
Expand All @@ -187,12 +187,12 @@ case class KafkaContinuousDataReaderFactory(
* @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
* are skipped.
*/
class KafkaContinuousDataReader(
class KafkaContinuousInputPartitionReader(
topicPartition: TopicPartition,
startOffset: Long,
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[UnsafeRow] {
private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
private val converter = new KafkaRecordToUnsafeRowConverter

Expand Down
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.UninterruptibleThread
Expand Down Expand Up @@ -101,7 +101,7 @@ private[kafka010] class KafkaMicroBatchReader(
}
}

override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
// Find the new partitions, and get their earliest offsets
val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
Expand Down Expand Up @@ -146,7 +146,7 @@ private[kafka010] class KafkaMicroBatchReader(
new KafkaMicroBatchDataReaderFactory(
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
}
factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
factories.map(_.asInstanceOf[InputPartition[UnsafeRow]]).asJava
}

override def getStartOffset: Offset = {
Expand Down Expand Up @@ -299,27 +299,28 @@ private[kafka010] class KafkaMicroBatchReader(
}
}

/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
private[kafka010] case class KafkaMicroBatchDataReaderFactory(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {
reuseKafkaConsumer: Boolean) extends InputPartition[UnsafeRow] {

override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray

override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader(
offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
override def createPartitionReader(): InputPartitionReader[UnsafeRow] =
new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, pollTimeoutMs,
failOnDataLoss, reuseKafkaConsumer)
}

/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
private[kafka010] case class KafkaMicroBatchDataReader(
/** A [[InputPartitionReader]] for reading Kafka data in a micro-batch streaming query. */
private[kafka010] case class KafkaMicroBatchInputPartitionReader(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging {
reuseKafkaConsumer: Boolean) extends InputPartitionReader[UnsafeRow] with Logging {

private val consumer = KafkaDataConsumer.acquire(
offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
Expand Down
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -149,7 +150,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

/**
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader]] to read
* Creates a [[ContinuousInputPartitionReader]] to read
* Kafka data in a continuous streaming query.
*/
override def createContinuousReader(
Expand Down
Expand Up @@ -678,7 +678,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))),
Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
)
val factories = reader.createUnsafeRowReaderFactories().asScala
val factories = reader.planUnsafeInputPartitions().asScala
.map(_.asInstanceOf[KafkaMicroBatchDataReaderFactory])
withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
assert(factories.size == numPartitionsGenerated)
Expand Down
Expand Up @@ -34,7 +34,7 @@ public interface MicroBatchReadSupport extends DataSourceV2 {
* streaming query.
*
* The execution engine will create a micro-batch reader at the start of a streaming query,
* alternate calls to setOffsetRange and createDataReaderFactories for each batch to process, and
* alternate calls to setOffsetRange and planInputPartitions for each batch to process, and
* then call stop() when the execution is complete. Note that a single query may have multiple
* executions due to restart or failure recovery.
*
Expand Down
Expand Up @@ -21,15 +21,15 @@
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;

/**
* A mix-in interface for {@link DataReaderFactory}. Continuous data reader factories can
* implement this interface to provide creating {@link DataReader} with particular offset.
* A mix-in interface for {@link InputPartition}. Continuous input partitions can
* implement this interface to provide creating {@link InputPartitionReader} with particular offset.
*/
@InterfaceStability.Evolving
public interface ContinuousDataReaderFactory<T> extends DataReaderFactory<T> {
public interface ContinuousInputPartition<T> extends InputPartition<T> {
/**
* Create a DataReader with particular offset as its startOffset.
*
* @param offset offset want to set as the DataReader's startOffset.
*/
DataReader<T> createDataReaderWithOffset(PartitionOffset offset);
InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
}
Expand Up @@ -31,8 +31,8 @@
* {@link ReadSupport#createReader(DataSourceOptions)} or
* {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
* It can mix in various query optimization interfaces to speed up the data scan. The actual scan
* logic is delegated to {@link DataReaderFactory}s that are returned by
* {@link #createDataReaderFactories()}.
* logic is delegated to {@link InputPartition}s that are returned by
* {@link #planInputPartitions()}.
*
* There are mainly 3 kinds of query optimizations:
* 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
Expand Down Expand Up @@ -65,8 +65,8 @@ public interface DataSourceReader {
StructType readSchema();

/**
* Returns a list of reader factories. Each factory is responsible for creating a data reader to
* output data for one RDD partition. That means the number of factories returned here is same as
* Returns a list of read tasks. Each task is responsible for creating a data reader to
* output data for one RDD partition. That means the number of tasks returned here is same as
* the number of RDD partitions this scan outputs.
*
* Note that, this may not be a full scan if the data source reader mixes in other optimization
Expand All @@ -76,5 +76,5 @@ public interface DataSourceReader {
* If this method fails (by throwing an exception), the action would fail and no Spark job was
* submitted.
*/
List<DataReaderFactory<Row>> createDataReaderFactories();
List<InputPartition<Row>> planInputPartitions();
}
Expand Up @@ -22,20 +22,20 @@
import org.apache.spark.annotation.InterfaceStability;

/**
* A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is
* An input partition returned by {@link DataSourceReader#planInputPartitions()} and is
* responsible for creating the actual data reader. The relationship between
* {@link DataReaderFactory} and {@link DataReader}
* {@link InputPartition} and {@link InputPartitionReader}
* is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
*
* Note that, the reader factory will be serialized and sent to executors, then the data reader
* will be created on executors and do the actual reading. So {@link DataReaderFactory} must be
* serializable and {@link DataReader} doesn't need to be.
* Note that input partitions will be serialized and sent to executors, then the partition reader
* will be created on executors and do the actual reading. So {@link InputPartition} must be
* serializable and {@link InputPartitionReader} doesn't need to be.
*/
@InterfaceStability.Evolving
public interface DataReaderFactory<T> extends Serializable {
public interface InputPartition<T> extends Serializable {

/**
* The preferred locations where the data reader returned by this reader factory can run faster,
* The preferred locations where the data reader returned by this partition can run faster,
* but Spark does not guarantee to run the data reader on these locations.
* The implementations should make sure that it can be run on any location.
* The location is a string representing the host name.
Expand All @@ -57,5 +57,5 @@ default String[] preferredLocations() {
* If this method fails (by throwing an exception), the corresponding Spark task would fail and
* get retried until hitting the maximum retry times.
*/
DataReader<T> createDataReader();
InputPartitionReader<T> createPartitionReader();
}
Expand Up @@ -23,15 +23,15 @@
import org.apache.spark.annotation.InterfaceStability;

/**
* A data reader returned by {@link DataReaderFactory#createDataReader()} and is responsible for
* A data reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
* outputting data for a RDD partition.
*
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
* source readers, or {@link org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
* readers that mix in {@link SupportsScanUnsafeRow}.
*/
@InterfaceStability.Evolving
public interface DataReader<T> extends Closeable {
public interface InputPartitionReader<T> extends Closeable {

/**
* Proceed to next record, returns false if there is no more records.
Expand Down
Expand Up @@ -24,7 +24,7 @@
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to report data partitioning and try to avoid shuffle at Spark side.
*
* Note that, when the reader creates exactly one {@link DataReaderFactory}, Spark may avoid
* Note that, when the reader creates exactly one {@link InputPartition}, Spark may avoid
* adding a shuffle even if the reader does not implement this interface.
*/
@InterfaceStability.Evolving
Expand Down
Expand Up @@ -30,22 +30,22 @@
@InterfaceStability.Evolving
public interface SupportsScanColumnarBatch extends DataSourceReader {
@Override
default List<DataReaderFactory<Row>> createDataReaderFactories() {
default List<InputPartition<Row>> planInputPartitions() {
throw new IllegalStateException(
"createDataReaderFactories not supported by default within SupportsScanColumnarBatch.");
"planInputPartitions not supported by default within SupportsScanColumnarBatch.");
}

/**
* Similar to {@link DataSourceReader#createDataReaderFactories()}, but returns columnar data
* Similar to {@link DataSourceReader#planInputPartitions()}, but returns columnar data
* in batches.
*/
List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories();
List<InputPartition<ColumnarBatch>> planBatchInputPartitions();

/**
* Returns true if the concrete data source reader can read data in batch according to the scan
* properties like required columns, pushes filters, etc. It's possible that the implementation
* can only support some certain columns with certain types. Users can overwrite this method and
* {@link #createDataReaderFactories()} to fallback to normal read path under some conditions.
* {@link #planInputPartitions()} to fallback to normal read path under some conditions.
*/
default boolean enableBatchRead() {
return true;
Expand Down
Expand Up @@ -33,14 +33,14 @@
public interface SupportsScanUnsafeRow extends DataSourceReader {

@Override
default List<DataReaderFactory<Row>> createDataReaderFactories() {
default List<InputPartition<Row>> planInputPartitions() {
throw new IllegalStateException(
"createDataReaderFactories not supported by default within SupportsScanUnsafeRow");
"planInputPartitions not supported by default within SupportsScanUnsafeRow");
}

/**
* Similar to {@link DataSourceReader#createDataReaderFactories()},
* Similar to {@link DataSourceReader#planInputPartitions()},
* but returns data in unsafe row format.
*/
List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories();
List<InputPartition<UnsafeRow>> planUnsafeInputPartitions();
}
Expand Up @@ -18,12 +18,12 @@
package org.apache.spark.sql.sources.v2.reader.partitioning;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;

/**
* A concrete implementation of {@link Distribution}. Represents a distribution where records that
* share the same values for the {@link #clusteredColumns} will be produced by the same
* {@link DataReader}.
* {@link InputPartitionReader}.
*/
@InterfaceStability.Evolving
public class ClusteredDistribution implements Distribution {
Expand Down
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark.sql.sources.v2.reader.partitioning;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;

/**
* An interface to represent data distribution requirement, which specifies how the records should
* be distributed among the data partitions(one {@link DataReader} outputs data for one partition).
* be distributed among the data partitions(one {@link InputPartitionReader} outputs data for one partition).
* Note that this interface has nothing to do with the data ordering inside one
* partition(the output records of a single {@link DataReader}).
* partition(the output records of a single {@link InputPartitionReader}).
*
* The instance of this interface is created and provided by Spark, then consumed by
* {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to
Expand Down
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.sources.v2.reader.partitioning;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;

/**
Expand All @@ -31,7 +31,7 @@
public interface Partitioning {

/**
* Returns the number of partitions(i.e., {@link DataReaderFactory}s) the data source outputs.
* Returns the number of partitions(i.e., {@link InputPartition}s) the data source outputs.
*/
int numPartitions();

Expand Down
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark.sql.sources.v2.reader.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;

/**
* A variation on {@link DataReader} for use with streaming in continuous processing mode.
* A variation on {@link InputPartitionReader} for use with streaming in continuous processing mode.
*/
@InterfaceStability.Evolving
public interface ContinuousDataReader<T> extends DataReader<T> {
public interface ContinuousInputPartitionReader<T> extends InputPartitionReader<T> {
/**
* Get the offset of the current record, or the start offset if no records have been read.
*
Expand Down

0 comments on commit 62d0139

Please sign in to comment.