-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8861] [table] Add support for batch queries in SQL Client #5660
Conversation
Hi @xccui, I will review this as soon as possible. Right now the priority is the Flink 1.5 release with FLIP-6 support. |
I see, @twalthr. Sorry for my impatience. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @xccui. I had a quick look at the changes and added some comments. I will do a proper review once the PR is rebased on the FLIP-6 changes. Could you update it?
@@ -159,6 +160,9 @@ protected void evaluate(ResultTableOperation operation, String binding) { | |||
case PREV: | |||
gotoPreviousPage(); | |||
break; | |||
case FIRST: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need a FIRST
. GOTO
allows for the same functionality. LAST
is a special feature to stay at the last page in streaming mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the function's point of view, that's true. I added it just for symmetry 😃.
clusterClient.shutdown(); | ||
} catch (Exception e) { | ||
// ignore | ||
List<Row> resultTable = SerializedListAccumulator.deserializeList(accResult, batchResult.getSerializer()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the deserialization logic into the result class? The executor should not need to handle internals of a result.
if (result == null) { | ||
throw new SqlExecutionException("Could not find a result with result identifier '" + resultId + "'."); | ||
} | ||
if (!resultStore.isStatic(resultId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need the distinction between dynamic and state result here? The executor should actually not matter. It should just kill whatever Flink job is running.
Sure @twalthr, I'll rebase the PR soon. |
2fa49a3
to
03fc9ec
Compare
Hi @twalthr, the PR has been reworked. Take a look when you are convenient. Thanks! |
Thank you @xccui. I will test your code and merge this... |
Thanks for the improvements, @twalthr. |
What is the purpose of the change
This PR added support for batch queries in SQL Client.
Brief change log
BatchResult
, which also implementedDynamicResult
, for the batch queries.JobExecutionResult
.Dataset.collect()
.retrieveTableResult()
method inLocalExecutorITCase
.""
inExecution.java
.Verifying this change
This change can be verified by the added test case
testBatchQueryExecution()
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation