New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24073][SQL]: Rename DataReaderFactory to InputPartition. #21145
[SPARK-24073][SQL]: Rename DataReaderFactory to InputPartition. #21145
Conversation
Test build #89799 has finished for PR 21145 at commit
|
c364c05
to
cdf2b4d
Compare
Test build #89806 has finished for PR 21145 at commit
|
Test build #89808 has finished for PR 21145 at commit
|
cea3b86
to
609ec14
Compare
Test build #89848 has finished for PR 21145 at commit
|
@@ -299,13 +299,13 @@ private[kafka010] class KafkaMicroBatchReader( | |||
} | |||
} | |||
|
|||
/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */ | |||
/** A [[ReadTask]] for reading Kafka data in a micro-batch streaming query. */ | |||
private[kafka010] case class KafkaMicroBatchDataReaderFactory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue . The re-naming of this PR seems to be partial. Could you replace KafkaMicroBatchDataReaderFactory
with KafkaMicroBatchReadTask
together in this PR? I guess there will be more occurrences like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes the API, not implementations, and it already touches 30+ files.
I'd rather not fix the downstream classes for two reasons. First, to avoid this becoming really large. Second, we need to be able to evolve these APIs without requiring changes to all implementations. I think we should avoid requiring changes to make everything consistent, or else there's tension between making necessary changes to the API and trying to move existing code to that API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This kind of changes always become unnecessarily big. Since this PR turns the master branch into an inconsistent state, could you make a JIRA issue for the remaining tasks which this PR avoids? Then, someone else can help Apache Spark become more consistent later eventually in Apache Spark 2.4 (or 3.0) timeframe.
I think we should avoid requiring changes to make everything consistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, good idea.
IMO, its better to keep it the current way.
The current naming makes it consistent at the read/write side and I think the re-naming would add to the already confusing interfaces. |
@arunmahadevan, the problem is that the current naming is misleading. This is not a factory (it only produces one specific reader) and it does not have the same lifecycle as the write-side factory. Using parallel naming for the two implies an equivalence that doesn't exist. |
It sounds like both |
I think |
@cloud-fan and @henryr, do you have an opinion about naming here? |
I don't mind I certainly agree that this is not a factory. |
Either names are not perfect. |
@gengliangwang, we can follow up with a rename for the streaming classes that already use this API. But there is no need to do that right now and make this commit larger. I think I've already made it clear that I think |
I don't see the problem with the name ReadTask. In RDDs, we call the serializable representation of a partition for distribution to executors just Partition, and I've always found this pretty intuitive. Certainly it wouldn't be better to call it ComputeIteratorFactory instead. |
@@ -22,20 +22,20 @@ | |||
import org.apache.spark.annotation.InterfaceStability; | |||
|
|||
/** | |||
* A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is | |||
* A read task returned by {@link DataSourceReader#createReadTasks()} and is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think ReadTask
is confusing. I were asked by multiple people what is a ReadTask
, especially when a Task
is already clearly defined as a unit of execution in Spark. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L31-L51
Let us avoid using Task
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok - how about ReadTaskDescriptor
? (In that case I think it would be ok to leave the method name as createReadTasks()
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hadoop uses the term "input split" for this. Would it be more clear if Spark adopted the same language?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InputSplit
seems a pretty good name for batch, since one split will be processed by one Spark task. How do streaming guys think about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I'm rethinking the suggestion: InputSplit
is a well-known Hadoop class that we probably shouldn't duplicate. What about using InputPartition
instead? That makes it clear that the partitioning is on the input data and uses the more common term in Spark.
Is everyone okay with this? @jose-torres @gengliangwang @cloud-fan @henryr @arunmahadevan @gatorsmile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InputPartition sounds fine, but is it ok to have a method like "createDataReader" inside it? Will create confusion when "inputPartition" is member of other classes like DataSourceRDDPartition?
It appears that the DataReaderFactory
is kind of a wrapper for the Reader so that the Reader itself need not be serializable. I am also ok to leave it as is (though technically it may not be a factory).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, one more idea that captures the relationship between DataReader
and ReadTask
: What about using DataReadable
? That's similar to the Iterable
and Iterator
relationship that the two classes have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we want to expose 2 things in the naming:
- it represents an input RDD partition
- it creates
DataReader
I think the first one really needs to be pointed out explicitly, while the second one is not that confusing to create a DataReader
from a partition. So +1 on InputPartition
.
@gatorsmile, the Spark UI has used the term "task" for years to refer to the same thing. I don't think it is unreasonable to use the same term. |
609ec14
to
250c1de
Compare
Test build #90334 has finished for PR 21145 at commit
|
250c1de
to
560ad6a
Compare
Test build #90378 has finished for PR 21145 at commit
|
560ad6a
to
3eff34b
Compare
@cloud-fan, I've updated this PR to use |
Test build #90380 has finished for PR 21145 at commit
|
3eff34b
to
ec53d12
Compare
Test build #90381 has finished for PR 21145 at commit
|
Renames: * DataReaderFactory -> InputPartition * DataReader -> InputPartitionReader * createDataReaderFactories -> planInputPartitions * createUnsafeDataReaderFactories -> planUnsafeInputPartitions * createBatchDataReaderFactories -> planBatchInputPartitions This fixes the changes in SPARK-23219, which renamed ReadTask to DataReaderFactory. The intent of that change was to make the read and write API match (write side uses DataWriterFactory), but the underlying problem is that the two classes are not equivalent. ReadTask/DataReader function as Iterable/Iterator. One InputPartition is a specific partition of the data to be read, in contrast to DataWriterFactory where the same factory instance is used in all write tasks. InputPartition's purpose is to manage the lifecycle of the associated reader, which is now called InputPartitionReader, with an explicit create operation to mirror the close operation. This was no longer clear from the API because DataReaderFactory appeared to be more generic than it is and it isn't clear why a set of them is produced for a read.
ec53d12
to
1423979
Compare
Test build #90382 has finished for PR 21145 at commit
|
@@ -76,5 +76,5 @@ | |||
* If this method fails (by throwing an exception), the action would fail and no Spark job was | |||
* submitted. | |||
*/ | |||
List<DataReaderFactory<Row>> createDataReaderFactories(); | |||
List<InputPartition<Row>> planInputPartitions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the hadoop world there is InputFormat.getSplits
, shall we follow and use getInputPartitions
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think plan is a more accurate verb. To some Java people, get
implies that the call is very cheap because it is associated with getters, which typically just return a field's value. Since that's not the case here and callers shouldn't consider this method cheap, I think it makes sense to use a different name that reflects what is actually happening: split planning.
any other suggestions about naming? we are going to rename |
|
Overall, +1 for the change. |
LGTM to |
LGTM. I can own cleaning up the names of the streaming classes, probably wrapping that into the broader task of getting a design doc for the streaming reader API. |
Thanks @jose-torres! I appreciate not blocking this commit on those changes, since it would be difficult to keep this up to date from the other paths changing, while we discussed what to call these classes. |
Thanks! Merged to master. |
## What changes were proposed in this pull request? In apache#21145, DataReaderFactory is renamed to InputPartition. This PR is to revise wording in the comments to make it more clear. ## How was this patch tested? None Author: Gengliang Wang <gengliang.wang@databricks.com> Closes apache#21326 from gengliangwang/revise_reader_comments.
Renames: * `DataReaderFactory` to `InputPartition` * `DataReader` to `InputPartitionReader` * `createDataReaderFactories` to `planInputPartitions` * `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions` * `createBatchDataReaderFactories` to `planBatchInputPartitions` This fixes the changes in SPARK-23219, which renamed ReadTask to DataReaderFactory. The intent of that change was to make the read and write API match (write side uses DataWriterFactory), but the underlying problem is that the two classes are not equivalent. ReadTask/DataReader function as Iterable/Iterator. One InputPartition is a specific partition of the data to be read, in contrast to DataWriterFactory where the same factory instance is used in all write tasks. InputPartition's purpose is to manage the lifecycle of the associated reader, which is now called InputPartitionReader, with an explicit create operation to mirror the close operation. This was no longer clear from the API because DataReaderFactory appeared to be more generic than it is and it isn't clear why a set of them is produced for a read. Existing tests, which have been updated to use the new name. Author: Ryan Blue <blue@apache.org> Closes apache#21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename. (cherry picked from commit 62d0139) Conflicts: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
## What changes were proposed in this pull request? In apache#21145, DataReaderFactory is renamed to InputPartition. This PR is to revise wording in the comments to make it more clear. ## How was this patch tested? None Author: Gengliang Wang <gengliang.wang@databricks.com> Closes apache#21326 from gengliangwang/revise_reader_comments. (cherry picked from commit 6fb7d6c)
Renames: * `DataReaderFactory` to `InputPartition` * `DataReader` to `InputPartitionReader` * `createDataReaderFactories` to `planInputPartitions` * `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions` * `createBatchDataReaderFactories` to `planBatchInputPartitions` This fixes the changes in SPARK-23219, which renamed ReadTask to DataReaderFactory. The intent of that change was to make the read and write API match (write side uses DataWriterFactory), but the underlying problem is that the two classes are not equivalent. ReadTask/DataReader function as Iterable/Iterator. One InputPartition is a specific partition of the data to be read, in contrast to DataWriterFactory where the same factory instance is used in all write tasks. InputPartition's purpose is to manage the lifecycle of the associated reader, which is now called InputPartitionReader, with an explicit create operation to mirror the close operation. This was no longer clear from the API because DataReaderFactory appeared to be more generic than it is and it isn't clear why a set of them is produced for a read. Existing tests, which have been updated to use the new name. Author: Ryan Blue <blue@apache.org> Closes apache#21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename.
## What changes were proposed in this pull request? In apache#21145, DataReaderFactory is renamed to InputPartition. This PR is to revise wording in the comments to make it more clear. ## How was this patch tested? None Author: Gengliang Wang <gengliang.wang@databricks.com> Closes apache#21326 from gengliangwang/revise_reader_comments.
Renames: * `DataReaderFactory` to `InputPartition` * `DataReader` to `InputPartitionReader` * `createDataReaderFactories` to `planInputPartitions` * `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions` * `createBatchDataReaderFactories` to `planBatchInputPartitions` This fixes the changes in SPARK-23219, which renamed ReadTask to DataReaderFactory. The intent of that change was to make the read and write API match (write side uses DataWriterFactory), but the underlying problem is that the two classes are not equivalent. ReadTask/DataReader function as Iterable/Iterator. One InputPartition is a specific partition of the data to be read, in contrast to DataWriterFactory where the same factory instance is used in all write tasks. InputPartition's purpose is to manage the lifecycle of the associated reader, which is now called InputPartitionReader, with an explicit create operation to mirror the close operation. This was no longer clear from the API because DataReaderFactory appeared to be more generic than it is and it isn't clear why a set of them is produced for a read. Existing tests, which have been updated to use the new name. Author: Ryan Blue <blue@apache.org> Closes apache#21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename. (cherry picked from commit 62d0139) Conflicts: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
## What changes were proposed in this pull request? In apache#21145, DataReaderFactory is renamed to InputPartition. This PR is to revise wording in the comments to make it more clear. ## How was this patch tested? None Author: Gengliang Wang <gengliang.wang@databricks.com> Closes apache#21326 from gengliangwang/revise_reader_comments. (cherry picked from commit 6fb7d6c)
Renames: * `DataReaderFactory` to `InputPartition` * `DataReader` to `InputPartitionReader` * `createDataReaderFactories` to `planInputPartitions` * `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions` * `createBatchDataReaderFactories` to `planBatchInputPartitions` This fixes the changes in SPARK-23219, which renamed ReadTask to DataReaderFactory. The intent of that change was to make the read and write API match (write side uses DataWriterFactory), but the underlying problem is that the two classes are not equivalent. ReadTask/DataReader function as Iterable/Iterator. One InputPartition is a specific partition of the data to be read, in contrast to DataWriterFactory where the same factory instance is used in all write tasks. InputPartition's purpose is to manage the lifecycle of the associated reader, which is now called InputPartitionReader, with an explicit create operation to mirror the close operation. This was no longer clear from the API because DataReaderFactory appeared to be more generic than it is and it isn't clear why a set of them is produced for a read. Existing tests, which have been updated to use the new name. Author: Ryan Blue <blue@apache.org> Closes apache#21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename. Ref: LIHADOOP-48531
In apache#21145, DataReaderFactory is renamed to InputPartition. This PR is to revise wording in the comments to make it more clear. None Author: Gengliang Wang <gengliang.wang@databricks.com> Closes apache#21326 from gengliangwang/revise_reader_comments. Ref: LIHADOOP-48531
What changes were proposed in this pull request?
Renames:
DataReaderFactory
toInputPartition
DataReader
toInputPartitionReader
createDataReaderFactories
toplanInputPartitions
createUnsafeDataReaderFactories
toplanUnsafeInputPartitions
createBatchDataReaderFactories
toplanBatchInputPartitions
This fixes the changes in SPARK-23219, which renamed ReadTask to
DataReaderFactory. The intent of that change was to make the read and
write API match (write side uses DataWriterFactory), but the underlying
problem is that the two classes are not equivalent.
ReadTask/DataReader function as Iterable/Iterator. One InputPartition is
a specific partition of the data to be read, in contrast to
DataWriterFactory where the same factory instance is used in all write
tasks. InputPartition's purpose is to manage the lifecycle of the
associated reader, which is now called InputPartitionReader, with an
explicit create operation to mirror the close operation. This was no
longer clear from the API because DataReaderFactory appeared to be more
generic than it is and it isn't clear why a set of them is produced for
a read.
How was this patch tested?
Existing tests, which have been updated to use the new name.