Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24882][SQL] improve data source v2 API #22009

Closed
wants to merge 28 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Aug 6, 2018

What changes were proposed in this pull request?

Improve the data source v2 API according to the design doc

summary of the changes

  1. rename ReadSupport -> DataSourceReader -> InputPartition -> InputPartitionReader to BatchReadSupportProvider -> BatchReadSupport -> InputPartition/PartitionReaderFactory -> PartitionReader. Similar renaming also happens at streaming and write APIs.
  2. create ScanConfig to store query specific information like operator pushdown result, streaming offsets, etc. This makes batch and streaming ReadSupport(previouslly named DataSourceReader) immutable. All other methods take ScanConfig as input, which implies applying operator pushdown and getting streaming offsets happen before all other things(get input partitions, report statistics, etc.).
  3. separate InputPartition to InputPartition and PartitionReaderFactory. This is a natural separation, data splitting and reading are orthogonal and we should not mix them in one interfaces. This also makes the naming consistent between read and write API: PartitionReaderFactory vs DataWriterFactory.
  4. separate the batch and streaming interfaces. Sometimes it's painful to force the streaming interface to extend batch interface, as we may need to override some batch methods to return false, or even leak the streaming concept to batch API(e.g. DataWriterFactory#createWriter(partitionId, taskId, epochId))

Some follow-ups we should do after this PR (tracked by https://issues.apache.org/jira/browse/SPARK-25186 ):

  1. Revisit the life cycle of ReadSupport instances. Currently I keep it same as the previous DataSourceReader, i.e. the life cycle is bound to the batch/stream query. This fits streaming very well but may not be perfect for batch source. We can also consider to let ReadSupport.newScanConfigBuilder take DataSourceOptions as parameter, if we decide to change the life cycle.
  2. Add WriteConfig. This is similar to ScanConfig and makes the write API more flexible. But it's only needed when we add the replaceWhere support, and it needs to change the streaming execution engine for this new concept, which I think is better to be done in another PR.
  3. Refine the document. This PR adds/changes a lot of document and it's very likely that some people may have better ideas.
  4. Figure out the life cycle of CustomMetrics. It looks to me that it should be bound to a ScanConfig, but we need to change ProgressReporter to get the ScanConfig. Better to be done in another PR.
  5. Better operator pushdown API. This PR keeps the pushdown API as it was, i.e. using the SupportsPushdownXYZ traits. We can design a better API using build pattern, but this is a complicated design and deserves an individual JIRA ticket and design doc.
  6. Improve the continuous streaming engine to only create a new ScanConfig when re-configuring.
  7. Remove SupportsPushdownCatalystFilter. This is actually not a must-have for file source, we can change the hive partition pruning to use the public Filter.

How was this patch tested?

existing tests.

@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Aug 6, 2018

Test build #94294 has finished for PR 22009 at commit 770a43d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2018

Test build #94297 has finished for PR 22009 at commit 291304a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • *
  • *

override def initialOffset: OffsetV2 = LongOffset(-1)

override def latestOffset(start: OffsetV2): OffsetV2 = {
if (currentOffset.offset == -1) null else currentOffset
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it's more reasonable to forbid data source to return null offsets, and use latestOffset != startOffset at the streaming execution side to indicate if there is a new batch available. But I'll leave it to followup PR.

cc @jose-torres

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. The V1 API allowed null because it didn't require sources to implement a "this is the beginning of the stream, read everything" offset, but that was a mistake which we correctly remedied by adding the initialOffset field in the interface.

A followup PR makes sense, because there's some stream execution logic that can be greatly simplified when all sources have a real initial offset.

@SparkQA
Copy link

SparkQA commented Aug 6, 2018

Test build #94301 has finished for PR 22009 at commit 6bf8e9d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • *
  • *
  • case class WriteToContinuousDataSourceExec(writeSupport: StreamingWriteSupport, query: SparkPlan)

@SparkQA
Copy link

SparkQA commented Aug 6, 2018

Test build #94306 has finished for PR 22009 at commit 779c0a0.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2018

Test build #94330 has finished for PR 22009 at commit 2f6d1d2.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Aug 7, 2018

Does this replace the other PR? I haven't looked at that one yet. If this is ready to review and follows the doc, I can review it.

@SparkQA
Copy link

SparkQA commented Aug 7, 2018

Test build #94336 has finished for PR 22009 at commit cab6d28.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

@rdblue can you point me to the other PR? This is the only PR I send out for data source v2 API improvement. I'd appreciate your time to review it, thanks!

@rdblue
Copy link
Contributor

rdblue commented Aug 7, 2018

There must not be one. I thought you'd already started a PR, my mistake.

@SparkQA
Copy link

SparkQA commented Aug 7, 2018

Test build #94337 has finished for PR 22009 at commit 2fc3b05.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

import org.apache.spark.sql.vectorized.ColumnarBatch;


public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is renamed to JavaColumnarDataSourceV2, to avoid confusion about batch vs streaming.

@SparkQA
Copy link

SparkQA commented Aug 7, 2018

Test build #94338 has finished for PR 22009 at commit 29b4f33.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -132,35 +134,15 @@ class MemoryV2CustomMetrics(sink: MemorySinkV2) extends CustomMetrics {
override def json(): String = Serialization.write(Map("numRows" -> sink.numRows))
}

class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode, schema: StructType)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually a batch writer not micro-batch, and is only used in the test. For writer API, micro-batch and continuous share the same interface, so we only need one streaming write implementation.

@SparkQA
Copy link

SparkQA commented Aug 7, 2018

Test build #94340 has finished for PR 22009 at commit c224999.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.


/**
* Applies column pruning w.r.t. the given requiredSchema.
*
* Implementation should try its best to prune the unnecessary columns or nested fields, but it's
* also OK to do the pruning partially, e.g., a data source may not be able to prune nested
* fields, and only prune top-level columns.
*
* Note that, data source readers should update {@link DataSourceReader#readSchema()} after
* applying column pruning.
*/
void pruneColumns(StructType requiredSchema);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have a new method prunedSchema, should we rename this to pruneSchema? As the parameter is also schema.

* streaming data sources.
*/
case class OffsetsOnlyScanConfigBuilder(start: Offset, end: Option[Offset] = None)
extends ScanConfigBuilder with ScanConfig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks hacky to do so: extends ScanConfigBuilder with ScanConfig and return the class itself for build()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise we need to create 2 very similar classes. I'm fine with both.

* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*/
StructType fullSchema();

This comment was marked as resolved.

@cloud-fan
Copy link
Contributor Author

retest this please

@@ -270,11 +269,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
}

override def createStreamWriter(
override def createStreamingWritSupport(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! fixed

@SparkQA
Copy link

SparkQA commented Aug 7, 2018

Test build #94377 has finished for PR 22009 at commit c224999.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good apart from one minor doc comment (and the rebase conflict)

* {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create
* input partitions and reader factory to scan data for its duration. At the end {@link #stop()}
* will be called when the streaming execution is completed. Note that a single query may have
* multiple executions due to restart or failure recovery.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add this documentation on the relevant methods. So getContinuousReadSupport and getMicroBatchReadSupport would say something like "Spark will call this method at the beginning of each streaming query to get a ReadSupport", newScanConfigBuilder would say something like "Spark will get a ScanConfig once for each data scanning job".

writer.commit(batchId, messages)
}
// A special `MicroBatchReadSupport` that can get latestOffset with a start offset.
trait RateControlMicroBatchReadSupport extends MicroBatchReadSupport {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is under org.apache.spark.sql.execution, so it's an internal API.

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #95029 has finished for PR 22009 at commit ca80080.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.


/**
* Creates a {@link DataSourceReader} to scan the data from this data source.
* Creates a {@link BatchReadSupport} instance to load the data from this data source with a user
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although Spark is OK to use a same ReadSupport instance across queries, this interface (BatchReadSupportProvider) is not going to support it. It's instantiated by reflection everytime DataFrameReader.load is called, so the implementation can not reuse a same ReadSupport instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine if the life-cycle of the provider inhibits this (though they could use a static cache). Spark should just not make assumptions about the ReadSupport instance being specific to a query.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Aug 21, 2018

some highlights: this PR is to reshape the data source v2 API to make it better, but NOT to make it perfect and stable, which is impossible to be done with one PR.

Whether this PR is merged before 2.4 or not, it doesn't matter. The data source v2 API is still unstable and backward compatibility will not be a concern in the data source v2 development in the next release. But I do hope we can merge it sooner, as it's quite big and will conflict with all other data source v2 related PRs.

Some follow-ups we should do after this PR:

  1. Revisit the life cycle of ReadSupport instances. Currently I keep it same as the previous DataSourceReader, i.e. the life cycle is bound to the batch/stream query. This fits streaming very well but may not be perfect for batch source. We can also consider to let ReadSupport.newScanConfigBuilder take DataSourceOptions as parameter, if we decide to change the life cycle.
  2. Add WriteConfig. This is similar to ScanConfig and makes the write API more flexible. But it's only needed when we add the replaceWhere support, and it needs to change the streaming execution engine for this new concept, which I think is better to be done in another PR.
  3. Refine the document. This PR adds/changes a lot of document and it's very likely that some people may have better ideas.
  4. Figure out the life cycle of CustomMetrics. It looks to me that it should be bound to a ScanConfig, but we need to change ProgressReporter to get the ScanConfig. Better to be done in another PR.
  5. Better operator pushdown API. This PR keeps the pushdown API as it was, i.e. using the SupportsPushdownXYZ traits. We can design a better API using build pattern, but this is a complicated design and deserves an individual JIRA ticket and design doc.
  6. Improve the continuous streaming engine to only create a new ScanConfig when re-configuring.
  7. Remove SupportsPushdownCatalystFilter. This is actually not a must-have for file source, we can change the hive partition pruning to use the public Filter.

I've created an umbrella JIRA to track them: https://issues.apache.org/jira/browse/SPARK-25186

@gatorsmile
Copy link
Member

@cloud-fan Post these follow-up tasks in the PR description?

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #95032 has finished for PR 22009 at commit 85c2476.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #95033 has finished for PR 22009 at commit f938614.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
case r: StreamingDataSourceV2Relation
if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic is subtly incorrect (and what's causing the flakiness in the continuous test). It needs to get the actual scan config being used from DataSourceV2ScanExec in the physical plan; r.scanConfigBuilder.build() will always produce the most up-to-date knownPartitions value, where what we want is the value that lastExecution was run with. In the prior iteration of the API this wasn't an issue because KafkaContinuousReader would precompute knownPartitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@rdblue
Copy link
Contributor

rdblue commented Aug 21, 2018

@cloud-fan, I think that the scan config builder needs to accept the options and that SaveMode needs to be removed before we should merge this PR. I'm fine with following up with the WriteConfig addition, but then I think that this should not contain any changes to the write path. It's a bad idea to put half the changes in the read PR and the other half in another one.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Aug 22, 2018

the scan config builder needs to accept the options

This is the first item in the follow-up list. Currently the life cycle of ReadSupport instance is bound to a batch/streaming query, so the user-specified options should be passed when returning the ReadSupport instance.

SaveMode needs to be removed

We can't remove existing features. Even if we have a better way to write data, what we should do is to deprecate the SaveMode but not remove it. I think all other reviewers do not agree to remove SaveMode.

It's a bad idea to put half the changes in the read PR

Yea I totally agree with it, but we need to define "half." I don't think the write side change here is just a haft. It's completed regarding all the data source features we have today. We can't say a change is half because it doesn't support a non-existing feature(the replaceWhere stuff). We can change the API later if it's necessary to support new features.

Again, this PR is not marking data source v2 API as stable. We can reject it if it does something wrong, but we can't just say "this doesn't have XYZ feature and we can't merge it". I tried my best to minimize the changes by keeping some design/semantic same as before(pushdown API is not changed, life cycle is not changed, etc.). It's OK if you have some better design/ideas, please send a new PR to do it instead of squashing them to this PR. In general we should improve the data source v2 incrementally.

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95071 has finished for PR 22009 at commit 8833b67.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #95076 has finished for PR 22009 at commit 51cda76.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, @rdblue @jose-torres and @cloud-fan had many fruitful discussions about the design of data source API V2. Please continue the discussion in the JIRAs https://issues.apache.org/jira/browse/SPARK-25186. Instead of mixing all the discussion topics in the same PR, how about we discussing the topics in different sub-JIRAs? It will be more productive.

The current API implementation looks good to me. I am merging this one now, but the APIs are still not stable yet. We can revisit the decisions made in this PR after the merge. Please continue to improve the data source APIs. Thanks, everyone!

Merged to master.

@gatorsmile
Copy link
Member

It sounds like the sync between apache and github is down. Although it has been merged, the PR has not been closed.

@asfgit asfgit closed this in e754887 Aug 22, 2018
asfgit pushed a commit that referenced this pull request Sep 12, 2018
## What changes were proposed in this pull request?

As discussed in the dev list, we don't want to include #22009 in Spark 2.4, as it needs data source v2 users to change the implementation intensitively, while they need to change again in next release.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #22388 from cloud-fan/revert.
@@ -169,15 +174,16 @@ object DataSourceV2Relation {
options: Map[String, String],
tableIdent: Option[TableIdentifier] = None,
userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
val reader = source.createReader(options, userSpecifiedSchema)
val readSupport = source.createReadSupport(options, userSpecifiedSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks not directly related with this PR but I think this is a good place to ask. Why do we make a readsupport in write path?

val relation = DataSourceV2Relation.create(source, options)

Retrieving the physical schema of the underlying storage is potentially expensive. Actually even worse: it looks odd that write path requires read side's schema. Which schema should we expect here in write path?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a regression comparing 2.3 - Data Source V2 is under heavy development so I understand but this is quite crucial. From a cursory look, this is introduced in 5fef6e3

I would suggest to partially revert this commit from branch-2.4.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a behavior change. Now when we append data to a data source, the data source must be readable, to provide a schema, which will be used to validate the input data schema.

I don't have a strong feeling. Data source v2 is marked as involving so necessary behavior change is fine. cc @rdblue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another point is which schema you would expect the datasource return in that case. For instance, spark.range(1).write.format("source").save(). It's odd that source should provide a schema. I mean it's logically weird. How does the source provide the schema?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the long term, I don't think that sources should use the reader to get a schema. This is a temporary hack until we have catalog support, which is really where schemas should come from.

The way this works in our version (which is substantially ahead of upstream Spark, unfortunately), is that a Table is loaded by a Catalog. The schema reported by that table is used to validate writes. That way, the table can report it's schema and Spark knows that data written must be compatible with that schema, but the source isn't required to be readable.

asfgit pushed a commit that referenced this pull request Oct 15, 2018
…n Data Source V2

## What changes were proposed in this pull request?

This PR proposes to partially revert 5fef6e3 so that it does make a readsupport and read schema when it writes in branch 2-4 since it's too breaking change.

5fef6e3 happened to create a readsupport in write path, which ended up with reading schema from readsupport at write path.

For instance, this breaks `spark.range(1).format("source").write.save("non-existent-path")` case since there's no way to read the schema from "non-existent-path".

See also #22009 (comment)
See also #22688
See also http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-in-DatasourceV2-td25343.html

## How was this patch tested?

Unit test and manual tests.

Closes #22697 from HyukjinKwon/append-revert-2.4.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
bavardage pushed a commit to palantir/spark that referenced this pull request Oct 25, 2018
…urce V2 write path

## What changes were proposed in this pull request?

This PR proposes to avoid to make a readsupport and read schema when it writes in other save modes.

apache@5fef6e3 happened to create a readsupport in write path, which ended up with reading schema from readsupport at write path.

This breaks `spark.range(1).format("source").write.save("non-existent-path")` case since there's no way to read the schema from "non-existent-path".

See also apache#22009 (comment)
See also apache#22697
See also http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-in-DatasourceV2-td25343.html

## How was this patch tested?

Unit test and manual tests.

Closes apache#22688 from HyukjinKwon/append-revert-2.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…urce V2 write path

## What changes were proposed in this pull request?

This PR proposes to avoid to make a readsupport and read schema when it writes in other save modes.

apache@5fef6e3 happened to create a readsupport in write path, which ended up with reading schema from readsupport at write path.

This breaks `spark.range(1).format("source").write.save("non-existent-path")` case since there's no way to read the schema from "non-existent-path".

See also apache#22009 (comment)
See also apache#22697
See also http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-in-DatasourceV2-td25343.html

## How was this patch tested?

Unit test and manual tests.

Closes apache#22688 from HyukjinKwon/append-revert-2.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants