Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32845][SS][TESTS] Add sinkParameter to check sink options robustly in DataStreamReaderWriterSuite #29730

Closed
wants to merge 1 commit into from

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Sep 11, 2020

What changes were proposed in this pull request?

This PR aims to add sinkParameter to check sink options robustly and independently in DataStreamReaderWriterSuite

Why are the changes needed?

LastOptions.parameters is designed to catch three cases: sourceSchema, createSource, createSink. However, StreamQuery.stop invokes queryExecutionThread.join, runStream, createSource immediately and reset the stored options by createSink.

To catch createSink options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split createSink option separately, we don't need this workaround and can eliminate this flakiness.

val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()

Does this PR introduce any user-facing change?

No. This is a test-only change.

How was this patch tested?

Pass the newly updated test case.

@@ -43,11 +43,13 @@ object LastOptions {
var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
var parameters: Map[String, String] = null
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Sep 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep this name for the other cases in order to reduce the patch size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about add a comment?

@dongjoon-hyun
Copy link
Member Author

@cloud-fan , could you review this?

@dongjoon-hyun
Copy link
Member Author

Also, cc @viirya and @HeartSaVioR

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks okay. I'm just curious why StreamQuery.stop invokes createSource again?

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Sep 11, 2020

Thank you, @viirya . It's because stop calls queryExecutionThread.join immediately and notifies the thread, the the thread runs runStream -> override lazy val logicalPlan: LogicalPlan -> dataSourceV1.createSource(metadataPath). I'll update the PR description.

@dongjoon-hyun
Copy link
Member Author

Merged to master/3.0/2.4.

dongjoon-hyun added a commit that referenced this pull request Sep 11, 2020
…stly in DataStreamReaderWriterSuite

This PR aims to add `sinkParameter`  to check sink options robustly and independently in DataStreamReaderWriterSuite

`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`.

To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness.

```scala
val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()
```

No. This is a test-only change.

Pass the newly updated test case.

Closes #29730 from dongjoon-hyun/SPARK-32845.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit b4be6a6)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
dongjoon-hyun added a commit that referenced this pull request Sep 11, 2020
…stly in DataStreamReaderWriterSuite

This PR aims to add `sinkParameter`  to check sink options robustly and independently in DataStreamReaderWriterSuite

`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`.

To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness.

```scala
val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()
```

No. This is a test-only change.

Pass the newly updated test case.

Closes #29730 from dongjoon-hyun/SPARK-32845.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit b4be6a6)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@dongjoon-hyun dongjoon-hyun deleted the SPARK-32845 branch September 11, 2020 19:03
@SparkQA
Copy link

SparkQA commented Sep 11, 2020

Test build #128573 has finished for PR 29730 at commit 9f2a036.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@dongjoon-hyun thanks for fixing it!

@dongjoon-hyun
Copy link
Member Author

Thanks! :)

holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
…stly in DataStreamReaderWriterSuite

This PR aims to add `sinkParameter`  to check sink options robustly and independently in DataStreamReaderWriterSuite

`LastOptions.parameters` is designed to catch three cases: `sourceSchema`, `createSource`, `createSink`. However, `StreamQuery.stop` invokes `queryExecutionThread.join`, `runStream`, `createSource` immediately and reset the stored options by `createSink`.

To catch `createSink` options, currently, the test suite is trying a workaround pattern. However, we observed a flakiness in this pattern sometimes. If we split `createSink` option separately, we don't need this workaround and can eliminate this flakiness.

```scala
val query = df.writeStream.
   ...
   .start()
assert(LastOptions.paramters(..))
query.stop()
```

No. This is a test-only change.

Pass the newly updated test case.

Closes apache#29730 from dongjoon-hyun/SPARK-32845.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit b4be6a6)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants