Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46186][CONNECT] Fix illegal state transition when ExecuteThreadRunner interrupted before started #44095

Closed
wants to merge 4 commits into from

Conversation

juliuszsompolski
Copy link
Contributor

What changes were proposed in this pull request?

A race condition sending interrupt (or releaseSession) just after execute could cause:

[info]   org.apache.spark.SparkException: java.lang.IllegalStateException: 
[info]         operationId: fa653330-587a-41c7-a4a9-7987f070dcba with status Started
[info]         is not within statuses List(Finished, Failed, Canceled) for event Closed

and

[info]   org.apache.spark.SparkException: java.lang.IllegalStateException: 
[info]         operationId: fa653330-587a-41c7-a4a9-7987f070dcba with status Closed
[info]         is not within statuses List(Started, Analyzed, ReadyForExecution, Finished, Failed) for event Canceled

when the interrupt arrives before the thread in ExecuteThreadRunner is started.

This would cause in ExecuteHolder close:

      runner.interrupt() <- just sets interrupted = true
      runner.join() <- thread didn't start yet, so join() returns immediately, doesn't wait for thread to be actually interrupted
      ...
      eventsManager.postClosed() <- causes the first failure, because thread wasn't running and didn't move to Canceled

Afterwards, assuming we allowed the transition, the thread will get started, and then want to immediately move to Canceled, notice the interrupted state. Then it would hit the 2nd error, not allowing Canceled after Closed.

While we could consider allowing the first transition (Started -> Closed), we don't want any events to be coming after Closed, so that listeners can clean their state after Closed.

Fix is to handle interrupts coming before the thread started, and then prevent the thread from even starting if it was interruped.

Why are the changes needed?

This was detected after grpc 1.56 to 1.59 upgrade and causes some tests in SparkConnectServiceE2ESuite and ReattachableExecuteSuite to be flaky.
With the grpc upgrade, execute is eagerly sent to the server, and in some test we cleanup and release the session without waiting for the execution to start. This has triggered this flakiness.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Test added. The test depends on timing, so may not fail reliably but only from time to time.

Was this patch authored or co-authored using generative AI tooling?

Github Copilot was assisting in some boilerplate auto-completion.

Generated-by: Github Copilot

@juliuszsompolski
Copy link
Contributor Author

cc @hvanhovell @grundprinzip

Comment on lines -131 to -136
/**
* Wait for the execution thread to finish and join it.
*/
def join(): Unit = {
runner.join()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't used, and should be internal, let's reduce the surface

.build()
val iter = stub.executePlan(
buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = operationId))
// wait for execute holder to exist, but the execute thread may not have started yet.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we send it too fast, before the exeuction is even created, the interrupt could be a noop, and the query will start running uninterrupted.
Interrupt returns in interrupt response that it didn't really interrupt any operation, but currently the clients don't implement checking this... I think trying to fix that further gets into really edge cases ...

@juliuszsompolski
Copy link
Contributor Author

gentle ping @hvanhovell @grundprinzip

@HyukjinKwon
Copy link
Member

Merged to master.

@juliuszsompolski
Copy link
Contributor Author

I am aware of some flakiness in the added tests, I will followup tomorrow to make it stable.

asl3 pushed a commit to asl3/spark that referenced this pull request Dec 5, 2023
…dRunner interrupted before started

### What changes were proposed in this pull request?

A race condition sending interrupt (or releaseSession) just after execute could cause:
```
[info]   org.apache.spark.SparkException: java.lang.IllegalStateException:
[info]         operationId: fa653330-587a-41c7-a4a9-7987f070dcba with status Started
[info]         is not within statuses List(Finished, Failed, Canceled) for event Closed
```
and
```
[info]   org.apache.spark.SparkException: java.lang.IllegalStateException:
[info]         operationId: fa653330-587a-41c7-a4a9-7987f070dcba with status Closed
[info]         is not within statuses List(Started, Analyzed, ReadyForExecution, Finished, Failed) for event Canceled
```
when the interrupt arrives before the thread in ExecuteThreadRunner is started.

This would cause in ExecuteHolder close:
```
      runner.interrupt() <- just sets interrupted = true
      runner.join() <- thread didn't start yet, so join() returns immediately, doesn't wait for thread to be actually interrupted
      ...
      eventsManager.postClosed() <- causes the first failure, because thread wasn't running and didn't move to Canceled
```
Afterwards, assuming we allowed the transition, the thread will get started, and then want to immediately move to Canceled, notice the `interrupted` state. Then it would hit the 2nd error, not allowing Canceled after Closed.

While we could consider allowing the first transition (Started -> Closed), we don't want any events to be coming after Closed, so that listeners can clean their state after Closed.

Fix is to handle interrupts coming before the thread started, and then prevent the thread from even starting if it was interruped.

### Why are the changes needed?

This was detected after grpc 1.56 to 1.59 upgrade and causes some tests in SparkConnectServiceE2ESuite and ReattachableExecuteSuite to be flaky.
With the grpc upgrade, execute is eagerly sent to the server, and in some test we cleanup and release the session without waiting for the execution to start. This has triggered this flakiness.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Test added. The test depends on timing, so may not fail reliably but only from time to time.

### Was this patch authored or co-authored using generative AI tooling?

Github Copilot was assisting in some boilerplate auto-completion.

Generated-by: Github Copilot

Closes apache#44095 from juliuszsompolski/SPARK-46186.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
dongjoon-hyun pushed a commit that referenced this pull request Dec 6, 2023
…ableExecuteSuite`

### What changes were proposed in this pull request?

The test added in #44095 could be flaky because `MEDIUM_RESULTS_QUERY` could very quickly finish before interrupt was sent. Replace it with a query that sleeps 30 seconds, so that we are sure that interrupt runs before it finishes.

### Why are the changes needed?

Remove test flakiness.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Rerun ReattachableExecuteSuite 100+ times to check it isn't flaky.

### Was this patch authored or co-authored using generative AI tooling?

Github Copilot was assisting in some boilerplate auto-completion.

Generated-by: Github Copilot

Closes #44189 from juliuszsompolski/SPARK-46186-followup.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dbatomic pushed a commit to dbatomic/spark that referenced this pull request Dec 11, 2023
…dRunner interrupted before started

### What changes were proposed in this pull request?

A race condition sending interrupt (or releaseSession) just after execute could cause:
```
[info]   org.apache.spark.SparkException: java.lang.IllegalStateException:
[info]         operationId: fa653330-587a-41c7-a4a9-7987f070dcba with status Started
[info]         is not within statuses List(Finished, Failed, Canceled) for event Closed
```
and
```
[info]   org.apache.spark.SparkException: java.lang.IllegalStateException:
[info]         operationId: fa653330-587a-41c7-a4a9-7987f070dcba with status Closed
[info]         is not within statuses List(Started, Analyzed, ReadyForExecution, Finished, Failed) for event Canceled
```
when the interrupt arrives before the thread in ExecuteThreadRunner is started.

This would cause in ExecuteHolder close:
```
      runner.interrupt() <- just sets interrupted = true
      runner.join() <- thread didn't start yet, so join() returns immediately, doesn't wait for thread to be actually interrupted
      ...
      eventsManager.postClosed() <- causes the first failure, because thread wasn't running and didn't move to Canceled
```
Afterwards, assuming we allowed the transition, the thread will get started, and then want to immediately move to Canceled, notice the `interrupted` state. Then it would hit the 2nd error, not allowing Canceled after Closed.

While we could consider allowing the first transition (Started -> Closed), we don't want any events to be coming after Closed, so that listeners can clean their state after Closed.

Fix is to handle interrupts coming before the thread started, and then prevent the thread from even starting if it was interruped.

### Why are the changes needed?

This was detected after grpc 1.56 to 1.59 upgrade and causes some tests in SparkConnectServiceE2ESuite and ReattachableExecuteSuite to be flaky.
With the grpc upgrade, execute is eagerly sent to the server, and in some test we cleanup and release the session without waiting for the execution to start. This has triggered this flakiness.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Test added. The test depends on timing, so may not fail reliably but only from time to time.

### Was this patch authored or co-authored using generative AI tooling?

Github Copilot was assisting in some boilerplate auto-completion.

Generated-by: Github Copilot

Closes apache#44095 from juliuszsompolski/SPARK-46186.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
dbatomic pushed a commit to dbatomic/spark that referenced this pull request Dec 11, 2023
…ableExecuteSuite`

### What changes were proposed in this pull request?

The test added in apache#44095 could be flaky because `MEDIUM_RESULTS_QUERY` could very quickly finish before interrupt was sent. Replace it with a query that sleeps 30 seconds, so that we are sure that interrupt runs before it finishes.

### Why are the changes needed?

Remove test flakiness.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Rerun ReattachableExecuteSuite 100+ times to check it isn't flaky.

### Was this patch authored or co-authored using generative AI tooling?

Github Copilot was assisting in some boilerplate auto-completion.

Generated-by: Github Copilot

Closes apache#44189 from juliuszsompolski/SPARK-46186-followup.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants