New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23202][SQL] Break down DataSourceV2Writer.commit into two phase #20386
Conversation
Test build #86595 has finished for PR 20386 at commit
|
Test build #86647 has finished for PR 20386 at commit
|
Test build #86690 has finished for PR 20386 at commit
|
Test build #86689 has finished for PR 20386 at commit
|
Test build #86698 has finished for PR 20386 at commit
|
bdd9bd1
to
e973187
Compare
Test build #86775 has finished for PR 20386 at commit
|
Test build #86788 has finished for PR 20386 at commit
|
* writer. If all the data are written successfully, call {@link DataWriter#commit()}. If | ||
* exception happens during the writing, call {@link DataWriter#abort()}. | ||
* 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If | ||
* writer. If all the data are written successfully, call {@link DataWriter#commit()}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If one data writer finishes successfully, the commit message will be sent back to the driver side and Spark will call #add.
* On a writer being successfully committed, call {@link #add(WriterCommitMessage)} to | ||
* handle its commit message. | ||
* If exception happens during the writing, call {@link DataWriter#abort()}. | ||
* 3. If all writers are successfully committed, call {@link #commit()}. If |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If all the data writers finish successfully, and #add is successfully called for all the commit messages, Spark will call #commit. If any of the data writers failed, or any of the #add call failed, or the job failed with an unknown reason, call #abort.
* failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination | ||
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. | ||
* failed, and {@link #abort()} would be called. The state of the destination | ||
* is undefined and @{@link #abort()} may not be able to deal with it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some more comments to say that, implementations should probably cache the commit messages and do the final step in #commit
@@ -63,32 +65,30 @@ | |||
DataWriterFactory<Row> createWriterFactory(); | |||
|
|||
/** | |||
* Commits this writing job with a list of commit messages. The commit messages are collected from | |||
* successful data writers and are produced by {@link DataWriter#commit()}. | |||
* Handles a commit message produced by {@link DataWriter#commit()}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ..., which is collected from a successful data writer in the executor side.
@@ -148,7 +148,8 @@ private[continuous] class EpochCoordinator( | |||
logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.") | |||
// Sequencing is important here. We must commit to the writer before recording the commit | |||
// in the query, or we will end up dropping the commit if we restart in the middle. | |||
writer.commit(epoch, thisEpochCommits.toArray) | |||
thisEpochCommits.foreach(writer.add(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to call add
once the commit message arrives?
Test build #86801 has finished for PR 20386 at commit
|
retest this please. |
val messages = new Array[WriterCommitMessage](rdd.partitions.length) | ||
|
||
logInfo(s"Start processing data source writer: $writer. " + | ||
s"The input RDD has ${messages.length} partitions.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be good to keep this log.
} | ||
|
||
def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} | ||
def abort(epochId: Long): Unit = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should clear the message array in abort too.
} | ||
|
||
override def abort(messages: Array[WriterCommitMessage]): Unit = { | ||
override def abort(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
|
||
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { | ||
override def abort(epochId: Long): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))), | ||
MemoryWriterCommitMessage(2, Seq(Row(6), Row(7))) | ||
) | ||
messages.foreach(writer.add(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
writer.add(MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))))
writer.add(MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))))
..
@@ -34,9 +33,9 @@ class ConsoleWriterSuite extends StreamTest { | |||
Console.withOut(captured) { | |||
val query = input.toDF().writeStream.format("console").start() | |||
try { | |||
input.addData(1, 2, 3) | |||
input.addData(1, 1, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order of collected messages is not the same as input data any more.
To make the test case working, we should either change input data to same elements, or set spark.default.parallelism
as 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fixable if we attach the partition id to the commit message of ConsoleSink, but is it worth? cc @zsxwing @jose-torres
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally I think a streaming sink doesn't need to keep the data order w.r.t. the partition id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, but can we set the parallelism to 1 instead? I worry that making all the elements the same is more likely to disguise a bug.
I like this change! It adds a missing feature which is required for migrating the file-based data source(which use LGTM, waiting feedback from others. |
Test build #86809 has finished for PR 20386 at commit
|
Test build #86823 has finished for PR 20386 at commit
|
Test build #86826 has finished for PR 20386 at commit
|
Test build #86822 has finished for PR 20386 at commit
|
@cloud-fan, is the intent to get this into 2.3.0? If so, I'll make time to review it today. |
*/ | ||
void commit(WriterCommitMessage[] messages); | ||
void commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT of using the same API as FileCommitProtocol, where the engine both calls add() for each message but also passes them in to commit() at the end? It seems like most writers will have to keep an array of the messages they received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something we wanna improve at the API level. I think the implementation should be free to decide how to store the messages, in case each message is big and there are a lot of them. If this is not a problem at all, we can follow FileCommitProtocol
.
Test build #86829 has finished for PR 20386 at commit
|
@rdblue The target is 2.3 release. Thanks for your time! |
* | ||
* If this method fails (by throwing an exception), this writing job is considered to to have been | ||
* failed, and {@link #abort()} would be called. The state of the destination | ||
* is undefined and @{@link #abort()} may not be able to deal with it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: javadoc typo.
* If this method fails (by throwing an exception), this writing job is considered to have been | ||
* failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}. | ||
* When this method is called, the number of commit messages added by | ||
* {@link #add(WriterCommitMessage)} equals to the number of input data partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean? It isn't clear to me what "the number of input partitions" means, or why it isn't obvious that it is equal to the number of pending WriterCommitMessage
instances passed to add.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about the number of data(RDD) partitions to write
?
why it isn't obvious ...
Maybe we can just follow FileCommitProtocol
, i.e. commit
and abort
still takes an array of messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing the messages to commit and abort seems simpler and better to me, but that's for the batch side. And, we shouldn't move forward with this unless there's a use case.
As for the docs here, what is an implementer intended to understand as a result of this? "The number of data partitions to write" is also misleading: weren't these already written and committed by tasks?
* Handles a commit message which is collected from a successful data writer. | ||
* | ||
* Note that, implementations might need to cache all commit messages before calling | ||
* {@link #commit()} or {@link #abort()}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what case would an implementation not cache and commit all at once? What is the point of a commit if not to make sure all of the data shows up at the same time?
* messages added by {@link #add(WriterCommitMessage)} should be smaller than the number | ||
* of input data partitions, as there may be only a few data writers that are committed | ||
* before the abort happens, or some data writers were committed but their commit messages | ||
* haven't reached the driver when the abort is triggered. So this is just a "best effort" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commit messages in flight should be handled and aborted. Otherwise, this isn't a "best effort". Best effort means that Spark does everything that is feasible to ensure that commit messages are added before aborting, and that should include race conditions from RPC.
The case where "best effort" might miss a message is if the message is created, but a node fails before it is sent to the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is no difference between "the message is created, but a node fails before it is sent" and "the message is in flight". Implementations need to deal with the case when a writer finishes successfully but its message is not available in abort
anyway.
best effort
might not be a good word, do you have a better suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Best effort is not just how we describe the behavior, it is a requirement of the contract. Spark should not drop commit messages because it is convenient. Spark knows what tasks succeeded and failed and which ones were authorized to commit. That's enough information to provide the best-effort guarantee.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of a weird case for API documentation, because the external users of the API will be implementing rather than consuming the interface. We shouldn't drop messages just because we don't want to be bothered, but it's easy to fix that if we make a mistake and there's no serious problem if we miss cases we really could have handled. It's a more serious issue if people misunderstand what Spark can provide, and implement sources which assume any commit message that's been generated will be passed to abort.
* | ||
* If this method fails (by throwing an exception), this writing job is considered to to have been | ||
* failed, and {@link #abort()} would be called. The state of the destination | ||
* is undefined and @{@link #abort()} may not be able to deal with it. | ||
* | ||
* To support exactly-once processing, writer implementations should ensure that this method is | ||
* idempotent. The execution engine may call commit() multiple times for the same epoch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize this isn't part of this commit, but why would an exactly-once guarantee require idempotent commits? Processing the same data twice with an idempotent guarantee is not the same thing as exactly-once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The StreamWriter is responsible for setting up a distributed transaction to commit the data within batch both locally and to the remote system. But the StreamExecution keeps its own log of which batches have been fully completed. ("Fully completed" includes things like stateful aggregation commits and progress logging which can't reasonably participate in the StreamWriter's transaction.)
So there's a scenario where Spark fails between StreamWriter commit and StreamExecution commit, in which the StreamExecution must re-execute the batch to ensure everything is in the right state. The StreamWriter is responsible for ensuring this doesn't generate duplicate data in the remote system.
Note that the "true" exactly once strategy, where the StreamWriter aborts the retried batch because it was already committed before, is indeed idempotent wrt StreamWriter.commit(epochId). But there are weaker strategies which still provide equivalent semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this explanation, I think I see what you're saying. But I think your statement that refers to "true" exactly-once gives away the fact that this does not provide exactly-once semantics.
Maybe this is a question for the dev list: why the weaker version? Shouldn't this API provide a check to see whether the data was already committed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the exact guarantees you're looking for when calling a system "exactly-once"? I worry you're looking for something that isn't possible. In particular, I don't know of any additional guarantee that check would allow us to make.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a commit interface, I expect the guarantee to be that data is committed exactly once. If commits are idempotent, data may be reprocessed, and commits may happen more than once, then that is not an exactly-once commit: that is an at-least-once commit.
I'm not trying to split hairs. My point is that if there's no difference in behavior between exactly-once and at-least-once because the commit must be idempotent, then you don't actually have a exactly-once guarantee.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's true that there's no exactly-once behavior with respect to StreamWriter.commit(). "Exactly-once processing" refers to the promise that the remote sink will contain exactly one committed copy of each processed record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the case, then this interface should be clear about it instead of including wording about exactly-once. For this interface, there is no exactly-once guarantee.
* failed, and {@link #abort()} would be called. The state of the destination | ||
* is undefined and @{@link #abort()} may not be able to deal with it. | ||
*/ | ||
void add(WriterCommitMessage message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only method shared between the stream and batch writers. Why does the streaming interface extend this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably shouldn't anymore. But I'd suggest dealing with that in another PR, because removing the inheritance will require splitting off some streaming parts of the execution engine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for separating and using another PR. Thanks.
@gengliangwang, what is the use case supported by this? In other words, how is In general, I'm more concerned with the batch side and I don't have a huge problem with this change. I do want to make sure it is in support of a valid use case. I'd also rather separate the batch and streaming committer APIs because they have so little in common. |
By a quick look, it seems |
@rdblue @cloud-fan @jose-torres thanks for the comments! After consideration, I decide to take the suggestion from @jose-torres : create a new API for commit message call back, and remain the api New PR: #20454 |
I agree it sounds reasonable, but we shouldn't add methods to a new API blindly and without a use case. The point of a new API, at least in part, is to improve on the old one. If it is never used, then we are carrying support for something that is useless. On the other hand, if it is used we should know what it is needed for so we can design for the use case. |
There is a lesson I learned from streaming data source v1: even it's totally internal, there are people already using it and ask us to not remove the API. I think it's also true for the file-based data source. It's internal but people may still use it. Although we don't find any use case for One possible use case might be, the implementation needs a 2-phase commit at the driver side. Then it can use |
## What changes were proposed in this pull request? The current DataSourceWriter API makes it hard to implement `onTaskCommit(taskCommit: TaskCommitMessage)` in `FileCommitProtocol`. In general, on receiving commit message, driver can start processing messages(e.g. persist messages into files) before all the messages are collected. The proposal to add a new API: `add(WriterCommitMessage message)`: Handles a commit message on receiving from a successful data writer. This should make the whole API of DataSourceWriter compatible with `FileCommitProtocol`, and more flexible. There was another radical attempt in apache#20386. This one should be more reasonable. ## How was this patch tested? Unit test Author: Wang Gengliang <ltnwgl@gmail.com> Closes apache#20454 from gengliangwang/write_api.
Close this PR now. Resolve the problem with #20454. |
The current DataSourceWriter API makes it hard to implement `onTaskCommit(taskCommit: TaskCommitMessage)` in `FileCommitProtocol`. In general, on receiving commit message, driver can start processing messages(e.g. persist messages into files) before all the messages are collected. The proposal to add a new API: `add(WriterCommitMessage message)`: Handles a commit message on receiving from a successful data writer. This should make the whole API of DataSourceWriter compatible with `FileCommitProtocol`, and more flexible. There was another radical attempt in apache#20386. This one should be more reasonable. Unit test Author: Wang Gengliang <ltnwgl@gmail.com> Closes apache#20454 from gengliangwang/write_api. (cherry picked from commit 9907bcfa045f96fb23822dc10eb3a2a42a6832d4) Conflicts: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
The current DataSourceWriter API makes it hard to implement `onTaskCommit(taskCommit: TaskCommitMessage)` in `FileCommitProtocol`. In general, on receiving commit message, driver can start processing messages(e.g. persist messages into files) before all the messages are collected. The proposal to add a new API: `add(WriterCommitMessage message)`: Handles a commit message on receiving from a successful data writer. This should make the whole API of DataSourceWriter compatible with `FileCommitProtocol`, and more flexible. There was another radical attempt in apache#20386. This one should be more reasonable. Unit test Author: Wang Gengliang <ltnwgl@gmail.com> Closes apache#20454 from gengliangwang/write_api.
The current DataSourceWriter API makes it hard to implement `onTaskCommit(taskCommit: TaskCommitMessage)` in `FileCommitProtocol`. In general, on receiving commit message, driver can start processing messages(e.g. persist messages into files) before all the messages are collected. The proposal to add a new API: `add(WriterCommitMessage message)`: Handles a commit message on receiving from a successful data writer. This should make the whole API of DataSourceWriter compatible with `FileCommitProtocol`, and more flexible. There was another radical attempt in apache#20386. This one should be more reasonable. Unit test Author: Wang Gengliang <ltnwgl@gmail.com> Closes apache#20454 from gengliangwang/write_api. (cherry picked from commit 9907bcfa045f96fb23822dc10eb3a2a42a6832d4) Conflicts: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
…nDataWriterCommit The current DataSourceWriter API makes it hard to implement `onTaskCommit(taskCommit: TaskCommitMessage)` in `FileCommitProtocol`. In general, on receiving commit message, driver can start processing messages(e.g. persist messages into files) before all the messages are collected. The proposal to add a new API: `add(WriterCommitMessage message)`: Handles a commit message on receiving from a successful data writer. This should make the whole API of DataSourceWriter compatible with `FileCommitProtocol`, and more flexible. There was another radical attempt in apache#20386. This one should be more reasonable. Unit test Author: Wang Gengliang <ltnwgl@gmail.com> Closes apache#20454 from gengliangwang/write_api. RB=1824728 A=
This PR is deprecated.
See #20454
What changes were proposed in this pull request?
Currently, the api
DataSourceV2Writer#commit(WriterCommitMessage[])
commits awriting job with a list of commit messages.
It makes sense in some scenarios, e.g. MicroBatchExecution.
However, the API makes it hard to implement
onTaskCommit(taskCommit: TaskCommitMessage)
inFileCommitProtocol
.In general, on receiving commit message, driver can start processing messages(e.g. persist messages into files) before all the messages are collected.
The proposal is to break down
DataSourceV2Writer.commit
into two phase:add(WriterCommitMessage message)
: Handles a commit message produced byDataWriter#commit()
.commit()
: Commits the writing job.This should make the API compatible with
FileCommitProtocol
, and more flexible.How was this patch tested?
Unit test