-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-14672][sql-client] Make Executor stateful in sql client #10270
Conversation
FYI @twalthr @KurtYoung , please help the reviewing if have time. Thanks. |
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit e2f6d55 (Wed Dec 04 15:13:32 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
CI report:
Bot commandsThe @flinkbot bot supports the following commands:
|
28c43fa
to
adbb8a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @openinx for working on this, the design looks good to me, I left some comments about code style.
private Executor executor; | ||
|
||
public ExecutionContext( | ||
Environment environment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent 2 tabs here.
You might need to change your idea setting: Editor -> Code Stype -> Java -> Continuation Indent 8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've set the checksytle in my ide, seems the Selection-format makes the indent happen again. Let me check.
Configuration flinkConfig, | ||
Options commandLineOptions, | ||
List<CustomCommandLine> availableCommandLines) throws FlinkException { | ||
this(environment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: If you are already wrapping lines for parameter, also wrap the first parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
Configuration flinkConfig, | ||
CustomCommandLine commandLine, | ||
ClusterClientServiceLoader clusterClientServiceLoader | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: align with other code format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
String sessionId = sessionContext.getSessionId(); | ||
ExecutionContext previousContext = this.contextMap.putIfAbsent(sessionId, createExecutionContext(sessionContext)); | ||
if (previousContext != null) { | ||
throw new SqlExecutionException("Found another session with the same session identifier: " + sessionContext.getSessionId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sessionContext.getSessionId()
-> sessionId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
if (previousContext != null) { | ||
throw new SqlExecutionException("Found another session with the same session identifier: " + sessionContext.getSessionId()); | ||
} | ||
return sessionContext.getSessionId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sessionContext.getSessionId()
-> sessionId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
final ExecutionContext<?> context = getOrCreateExecutionContext(session); | ||
public void resetSessionProperties(String sessionId) throws SqlExecutionException { | ||
// Renew the ExecutionContext by using the default environment. | ||
this.contextMap.put(sessionId, createExecutionContext(defaultEnvironment)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reset session property should keep the content which read from session environment file? Now it looks we also git rid of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I noticed here. I planed to not saving the session environment file in ExecutionContext before, so here seems no method to reset back to the session env. Let me re-think about the design , maybe we still need the session env..
Environment env = getExecutionContext(sessionId).getEnvironment(); | ||
Environment newEnv = Environment.enrich(env, ImmutableMap.of(key, value), ImmutableMap.of()); | ||
// Renew the ExecutionContext by merging the default environment and new environment. | ||
this.contextMap.put(sessionId, createExecutionContext(defaultEnvironment, newEnv)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can directly call createExecutionContext(newEnv)
here, defaultEnvironment
should be already covered by the env
you got from current execution context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, thanks for pointing it out.
ImmutableMap.of(), | ||
ImmutableMap.of(name, ViewEntry.create(name, query))); | ||
// Renew the ExecutionContext. | ||
this.contextMap.put(sessionId, createExecutionContext(defaultEnvironment, newEnv)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will also change this.
public TypedResult<List<Tuple2<Boolean, Row>>> retrieveResultChanges( | ||
String sessionId, | ||
String resultId | ||
) throws SqlExecutionException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
I think @aljoscha also works on this field. It is better to align works before merge one or the other. |
7be55c9
to
520c528
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, @twalthr do you want take a look?
@@ -63,7 +63,7 @@ public ResultStore(Configuration flinkConfig) { | |||
|
|||
if (env.getExecution().inStreamingMode()) { | |||
// determine gateway address (and port if possible) | |||
final InetAddress gatewayAddress = getGatewayAddress(env.getDeployment()); | |||
InetAddress gatewayAddress = getGatewayAddress(env.getDeployment()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why deleting final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's unintentional change, will revert it.
Thanks for pinging me @KurtYoung. I would like to take a look. |
…h session: The SessionContext is only used for frontend user to open session, all the backend informations are maintained in ExecutionContext.
…resetSessionProperties semantics issues
…v#createTemporaryView
@twalthr do you have any comments on this? I will merge this if you don't mind. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A quick question: shall we introduce a type SessionID
as wrapper of String
instead of just using String
? We can gain type safety & expressive code then.
We could revisit this when necessary, currently I don't see any strong reason to make it a |
bq. We could revisit this when necessary, currently I don't see any strong reason to make it a Class instead of a String |
merging this now |
Simplify the executor backend state for each session: The SessionContext is only used for frontend user to open session, all the backend informations are maintained in ExecutionContext. This closes apache#10270
What is the purpose of the change
Now we only have the embedded mode SQL to interact with Flink cluster, will support the multi-statement and DDL such create table, which means we need to store some state for the same session. On the other hande, it will support the gateway mode in future, means it will maitain multi-sessions at the server side.
So it's necessary to be stateful for the Executor (both LocalExecutor and GatewayExecutor).
The pull request provide two extra API in Executor interface to manage session life-cycle:
For each session, we will openSesion firstly, then execute some DDL or DML, and closeSession finally. When executing the DDL or DML, it will share the same session context and execution context for the same session identifier.
Brief change log
Verifying this change
The LocalExecutorITCase addressed all the cases.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation