Skip to content

[SPARK-34183][SS] DataSource V2: Support required distribution and ordering in SS#31700

Closed
aokolnychyi wants to merge 1 commit intoapache:masterfrom
aokolnychyi:ss-proto
Closed

[SPARK-34183][SS] DataSource V2: Support required distribution and ordering in SS#31700
aokolnychyi wants to merge 1 commit intoapache:masterfrom
aokolnychyi:ss-proto

Conversation

@aokolnychyi
Copy link
Contributor

What changes were proposed in this pull request?

This PR shows one potential way to support required distribution and ordering in SS, which is marked as a blocker for 3.2.

Important! This change does not try to address other relevant TODO items like refreshing the cache or performing more checks using the capability API.

Why are the changes needed?

These changes are needed so that data sources can request a specific distribution and ordering not only for batch but also for micro-batch writes.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

This PR adds additional tests.

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Mar 2, 2021

@aokolnychyi
Copy link
Contributor Author

This PR needs more refinement but I would like to get some early feedback from folks working on SS before investing more time. There are quite some TODO items around the capability API so I am not sure what was the original plan.

I guess this covers SPARK-27484.

@aokolnychyi
Copy link
Contributor Author

I'd appreciate your feedback, @tdas @jose-torres @brkyvz.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote it this way to minimize changes.

@SparkQA
Copy link

SparkQA commented Mar 2, 2021

Test build #135629 has started for PR 31700 at commit 6581d7b.

@SparkQA
Copy link

SparkQA commented Mar 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40209/

@SparkQA
Copy link

SparkQA commented Mar 2, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40209/

// For various commands (like DDL) and queries with side effects, we force query execution
// to happen right away to let these side effects take place eagerly.
val plan = queryExecution.analyzed match {
case _: V2MicroBatchWriteCommand =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit is needed to prevent executing each batch early. We used WriteToDataSourceV2 before, which did not extend Command.

@aokolnychyi
Copy link
Contributor Author

After this change, we no longer use WriteToDataSourceV2, which was deprecated in 2.4.

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40301/

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40301/

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Test build #135719 has finished for PR 31700 at commit e7f167f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@aokolnychyi
Copy link
Contributor Author

All tests are green so this PR is ready for the first review round.

}
}

case class AppendMicroBatch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have finalized the streaming writing semantic yet. Ideally, it should be similar to batch (upsert semantic), but we are not at that point yet.

I think we should standardize the streaming writing query plan later. For now, let's just use one query plan with OutputMode as the parameter.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I thought only update mode was under discussion. Are overwrite and append modes under discussion too?

I saw @HeartSaVioR's PR to rename SupportsStreamingUpdateAsAppend. Is there a discussion I can take a look at?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming write semantic is not same as batch one. The semantic is bound to the stateful operation; there should be only append, update (not same as overwrite), and truncate and append (complete). For update we haven't constructed the proper way to define it.

The major concern is that the group keys in stateful operation must be used as keys in update mode. That is currently not possible (there are some sketched ideas on this though), but Spark has been dealing with update with the huge risk that we're doing the same as append, and the risk is delegated to the sink (or user). The sink or user has to deal with reflecting the appended output as "upsert". That's why I renamed SupportsStreamingUpdate as SupportsStreamingUpdateAsAppend to clarify the behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context, @HeartSaVioR!

I think we have two ways to proceed:

Option 1: Just adapt WriteToMicroBatchDataSource to use the Write abstraction and handle it in V2Writes.
Option 2: Define specific plans where we have clarity. For example, append and complete seem well-defined. We could define plans like AppendStreamingData and TruncateAndAppendStreamingData or anything like that and have something intermediate for update.

I am fine either way but option 1 seems easier for this PR. The rest can be covered by SPARK-27484.

To start with, we should all agree this feature is useful for micro-batch streaming.

@cloud-fan
Copy link
Contributor

Micro-batch is an implementation detail and shouldn't be used to define the semantic.

If a streaming sink has a distribution requirement, I can understand it and the data should be properly partitioned before entering the streaming sink. However, if a streaming sink has an ordering requirement, I can't imagine how a data stream can satisfy an ordering requirement.

@aokolnychyi
Copy link
Contributor Author

@cloud-fan, I am not sure about the continuous mode, but I think there is a valid use case for micro-batch streaming. The required distribution and ordering apply to individual writes so it does not mean the underlying sink is globally ordered.

For example, let's say we are writing to a partitioned file sink. If we just group incoming data by partition, a single output task may still have records for multiple partitions. A naive sink implementation may close the current file and open a new one each time it sees records for another partition, producing a large number of files. An alternative implementation can keep multiple files open. That's not ideal too as we increase memory consumption. That's why ordering data within a task by partition seems like a good default for micro-batch streaming.

@aokolnychyi
Copy link
Contributor Author

Micro-batch is an implementation detail and shouldn't be used to define the semantic.

That's a fair argument. I'd like to hide it as well. Do you have a good idea, @cloud-fan? I was inspired by the existing code where we had WriteToContinuousDataSource and WriteToMicroBatchDataSource. We probably need a way to distinguish continuous and micro-batch writes (does not have to be a separate plan, though).

@aokolnychyi
Copy link
Contributor Author

If there is no clarity on how individual plans should look like in SS, I can adapt the existing WriteToMicroBatchDataSource.

@cloud-fan
Copy link
Contributor

cloud-fan commented Mar 5, 2021

Maybe we can define required ordering as "sort the data within each epoch". For batch, an epoch is an entire partition. For micro-batch, an epoch is a micro-batch. For continuous, we already have the epoch semantic. @HeartSaVioR how do you think about it?

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 5, 2021

Actually that's the one of few advantages from micro-batch compared to record-to-record, and we already leveraged it by some public API (e.g. flatMapGroupsWithState - this "sorts" the inputs in specific micro-batch so that values from the same group can be served to the user func sequentially wrapped with iterator. Imagine how it could be done without sorting.)

That said, I'm supportive on the concept of the ordering, only for micro-batch. Dealing with sort in continuous mode is quite tricky - due to the nature of record-to-record processing, sort requires to buffer inputs into state or somewhere in memory until the epoch has been finished (we can maintain the state or buffer be kept to be sorted though), and downstream operations can only continue their works after that, which contradicts the fact that epoch is finished.

My 2 cents on continuous mode is that we'd be better to admit the architectural differences between the batch oriented and streaming oriented, and try to have some safe approach to isolate between twos. Naturally integrating twos sounds very hard to achieve, and even has been playing as roadblock for improving functionalities on micro-batch mode as well.

@cloud-fan
Copy link
Contributor

ok at least we should document this clearly in RequiresDistributionAndOrdering

cloud-fan pushed a commit that referenced this pull request Apr 13, 2021
…s associated cache

### What changes were proposed in this pull request?

Populate table catalog and identifier from `DataStreamWriter` to `WriteToMicroBatchDataSource` so that we can invalidate cache for tables that are updated by a streaming write.

This is somewhat related [SPARK-27484](https://issues.apache.org/jira/browse/SPARK-27484) and [SPARK-34183](https://issues.apache.org/jira/browse/SPARK-34183) (#31700), as ideally we may want to replace `WriteToMicroBatchDataSource` and `WriteToDataSourceV2` with logical write nodes and feed them to analyzer. That will potentially change the code path involved in this PR.

### Why are the changes needed?

Currently `WriteToDataSourceV2` doesn't have cache invalidation logic, and therefore, when the target table for a micro batch streaming job is cached, the cache entry won't be removed when the table is updated.

### Does this PR introduce _any_ user-facing change?

Yes now when a DSv2 table which supports streaming write is updated by a streaming job, its cache will also be invalidated.

### How was this patch tested?

Added a new UT.

Closes #32039 from sunchao/streaming-cache.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jun 14, 2021
@github-actions github-actions bot closed this Jun 15, 2021
@aokolnychyi
Copy link
Contributor Author

@cloud-fan @HyukjinKwon @HeartSaVioR @dongjoon-hyun @viirya @sunchao, SPARK-34183 is marked as a blocker for 3.2.0. I can update this PR but I'll need input on open questions.

I guess the primary discussion spot is here. I understand the streaming plans may not be ready. If so, I propose to just extend the existing micro-batch plans with the distribution and ordering capabilities.

@sunchao
Copy link
Member

sunchao commented Jul 1, 2021

I guess the primary discussion spot is here. I understand the streaming plans may not be ready. If so, I propose to just extend the existing micro-batch plans with the distribution and ordering capabilities.

+1 on this given that there are still open questions, so it's better not to introduce new APIs at this point. The new approach looks pretty straightforward too.

@HeartSaVioR
Copy link
Contributor

Sorry I forgot this one. Given the fact that we don't have streaming plans and there's no plan to address this, I'm +1 to make existing micro-batch plan to allow distribution and ordering capabilities. Even better if we can make it fail on continuous mode with proper explanation.

@aokolnychyi
Copy link
Contributor Author

Okay, I'll update the PR by the end of the week and then we can decide whether it is something we want to have in 3.2.0. I am fine not including this change but the feature we release in 3.2 won't be complete in that case.

HeartSaVioR pushed a commit that referenced this pull request Feb 8, 2022
…n micro-batch execution

### What changes were proposed in this pull request?

This PR adjusts existing logical plans for micro-batch writes to support required distribution and ordering. This change implements what was discussed in PR #31700. In particular, the consensus was to adapt existing streaming plans to support write requirements instead of introducing new logical plans for Structured Streaming. That's a separate item and must be addressed independently.

### Why are the changes needed?

These changes are needed so that data sources can request a specific distribution and ordering not only for batch but also for micro-batch writes.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR extends existing tests to cover micro-batch cases.

Closes #35374 from aokolnychyi/spark-34183-v2.

Authored-by: Anton Okolnychyi <aokolnychyi@apple.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants