Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8858] [sql-client] Add support for INSERT INTO in SQL Client #6332

Closed
wants to merge 1 commit into from

Conversation

twalthr
Copy link
Contributor

@twalthr twalthr commented Jul 13, 2018

What is the purpose of the change

This PR adds support for the SQL INSERT INTO statement in SQL Client. This PR depends on #6323 for finalizing the unified table sinks. The PR adds support for submitting INSERT INTO statements in the CLI shell as well as using the -u command line option. The command line option is the basis for end-to-end testing of the SQL Client (FLINK-8970).

Brief change log

  • Refactor the XXXResult classes
  • Add INSERT INTO support to CLI and local executor
  • Add -u command line parameter

Verifying this change

  • DependencyTest has been adapted
  • New test o.a.f.table.client.gateway.local.LocalExecutorITCase#testStreamQueryExecutionSink
  • New test o.a.f.table.client.gateway.local.ExecutionContextTest#testSourceSinks

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost LGTM. One complain that results refactor could be extracted to separate commit, but this PR was more or less straightforward so no big harm :)

@@ -85,6 +86,9 @@ public CliClient(SessionContext context, Executor executor) {
terminal = TerminalBuilder.builder()
.name(CliStrings.CLI_NAME)
.build();
// make space from previous output and test the writer
terminal.writer().println();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is that needed? Shouldn't we print new line at then of the output instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the output on the terminal nicer. We don't know what has been printed before. This starts a terminal session. The output looks now like:

No default environment specified.
Searching for '/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/Users/twalthr/flink/flink/flink-dist/target/flink-1.6-SNAPSHOT-bin/flink-1.6-SNAPSHOT/conf/sql-client-defaults.yaml
No session environment specified.

[INFO] Executing the following statement:
INSERT INTO MyTableName SELECT * FROM MyTableName
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId / Job ID: fab21f0632da36f9236c343c2850c71d
For the current job status visit: http://localhost:8081

Shutting down executor...done.

}
// execute single update statement
else {
final boolean success = cli.submitUpdate(options.getUpdateStatement());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not wait for the query to complete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this would block the process for unbounded queries and require a (fault-tolerant) monitoring in the SQL Client which is not intended. We just block until the statement has been submitted to the cluster.

terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_UNKNOWN_SQL));
continue;
}
final Optional<SqlCommandCall> parsedStatement = SqlCommandParser.parse(statement);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deduplicate parsing call and if check - there is already a bug here, either missing flush or unnecessary flush

break;
case SOURCE:
callSource(cmdCall);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing default?

* @param statement SQL update statement
* @return flag to indicate if the submission was successful or not
*/
public boolean submitUpdate(String statement) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have tests for that? Some ITCase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire CliClient is only tested manually so far.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, that's bad :( Is it hard to start local flink mini cluster and execute cli process from JUnit test? (Maybe as a follow up after feature freeze)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some unit tests.


try {
final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]);
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

programTarget.writeTo(terminal.writer())

? It would be easier to add more fields in the future and more difficult to forget about printing them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I had it similar like your proposal, but this would mix a data model class and visualization. ProgramTargetDescriptor should not be responsible how it is represented in the CLI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, what about

terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
terminal.writer().println(programTarget.toString())
```?

private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> context, String statement) {
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();

// apply update statement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of those comments are a little bit unnecessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On one side yes but on the other side it allows to read the comments from top to bottom and know what the method is doing without having to look at the actual code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there is no additional value of

// apply update statement

over

applyUpdate(envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);

which is completely self explanatory. This is just form of some small code duplication. Same applies to most of those comments.

* @param tableEnv table environment
* @param query SQL SELECT query
* @return result table object
*/
private Table createTable(TableEnvironment tableEnv, String query) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename query -> selectQuery and drop @param section from java doc?

* @param queryConfig query configuration
* @param statement SQL update statement (e.g. INSERT INTO)
*/
private void applyUpdate(TableEnvironment tableEnv, QueryConfig queryConfig, String statement) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename statement -> updateStatement and drop @param section from java doc?

@@ -73,7 +86,7 @@ public JobExecutionResult fetchExecutionResult() {
* Deploys a job. Depending on the deployment creates a new job cluster. It saves the cluster id in
* the result and blocks until job completion.
*/
private <T> JobExecutionResult deployJob(ExecutionContext<T> context, JobGraph jobGraph, DynamicResult<T> result) {
private <T> void deployJob(ExecutionContext<T> context, JobGraph jobGraph, Result<T> result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split this method? Extract if (context.getClusterId() == null) and else branches to separate methods?

@pnowojski
Copy link
Contributor

pnowojski commented Jul 13, 2018

One more thing. Do we need this -u flag? Shouldn't it be enough to support something like:
flink-cli < query01.sql or echo "INSERT INTO bar SELECT * FROM foo" | flink-cli

@twalthr
Copy link
Contributor Author

twalthr commented Jul 14, 2018

Thanks for the review @pnowojski. I agree that we should support flink-cli < query01.sql or echo "INSERT INTO bar SELECT * FROM foo" | flink-cli. However, I would move this to a separate issue because I'm not sure how well we support multilines and EOF right now. With the -u flag the user also gets the correct error code after the submission, with flink-cli < query01.sql the CLI would either stay in interactive mode or always return success. I would keep the -u flag for now for testing purposes. We could remove it from the CLI help and leave it as an internal parameter for now. What do you think?

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-u seems too specific for me. If it supported any query (-i --input) it would have more sense. However hiding it from CLI help and postponing this issue is fine for me.

Besides that LGTM. Just two minor comments.

private <C> ProgramTargetDescriptor executeUpdateInternal(ExecutionContext<C> context, String statement) {
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();

// apply update statement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there is no additional value of

// apply update statement

over

applyUpdate(envInst.getTableEnvironment(), envInst.getQueryConfig(), statement);

which is completely self explanatory. This is just form of some small code duplication. Same applies to most of those comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants