Skip to content

Conversation

@godfreyhe
Copy link
Contributor

What is the purpose of the change

TableEnvironment.executeSql() method for INSERT statement returns TableResult once the job is submitted. Users must use tableResult.getJobClient.get().getJobExecutionResult(Thread.currentThread().getContextClassLoader).get() to wait the job finish. This API looks very ugly. This pr aims to introduce TableResult#await method to make the api fluent

Brief change log

  • Introduce await in TableResult
  • remove TableEnvUtil and update the related tests
  • remove the usage of tableResult.getJobClient.get().getJobExecutionResult().get() in planner tests

Verifying this change

This change is already covered by existing tests*.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit a66d075 (Tue Jun 16 17:45:39 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 16, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@godfreyhe godfreyhe force-pushed the FLINK-18337 branch 4 times, most recently from 51d8c4c to 5faf259 Compare June 17, 2020 06:25
@godfreyhe
Copy link
Contributor Author

cc @twalthr

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this change @godfreyhe, it makes the API much nicer and concise. I left some comments.

"FROM hTable AS h");

List<Row> results = collectBatchResult(table);
List<Row> results = Lists.newArrayList(table.execute().collect());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid dependencies to other modules for simple util functions. Introduce an overloaded method org.apache.flink.util.CollectionUtil#iteratorToList in a hotfix commit

" h.family3.col2, " +
" h.family3.col3 " +
"FROM hTable AS h");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all locations that were updated in the previous commit the declared throws Exception is not necessary anymore. My IDE shows a lot of warnings Exception 'java.lang.Exception' is never thrown in the method which we can fix in this PR.

"insert into db1.sink_table select 6,'a','b','2020-05-03','12'")
.await();
} catch (Exception e) {
throw new TableException("Failed to execute sql", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw AssertionError in tests with Assert.fail().

"""
Wait if necessary for at most the given time (milliseconds) for the data to be ready.
For select operation, this method will wait unit the first row can be accessed in local.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a select operation, this method will wait until the first row can be accessed locally.
For an insert operation, this method will wait for the job to finish, because the result contains only one row.
For other operations, this method will return immediately, because the result is already available locally.

else:
return None

def wait(self, timeout_ms=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call it await to be consistent. how are the exceptions handled in Python?

Copy link
Contributor Author

@godfreyhe godfreyhe Jun 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await is a keyword in python.
just like as method in Table, python Table uses alias.

now flink-python does not define Exceptions, java exception will be thrown directly. example: python Catalog class

* For insert operation, this method will wait for the job to finish, because the result contains only one row.
* For other operations, this method will return immediately, because the result is ready in local.
*
* @throws ExecutionException if this future completed exceptionally
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this future completed exceptionally it is not really a future

/**
* Iterator for insert operation result.
*/
private static final class InsertResultIterator implements CloseableIterator<Row> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TableEnvironmentImpl is already quite large. Move it to a separate class in default scope.

}

ExecutorService executor = Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setNameFormat("TableResult-await-thread").build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid guava

@godfreyhe
Copy link
Contributor Author

@twalthr Thanks for the suggestion, I have updated the pr

Copy link
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @godfreyhe. I added my last batch of comments. Good to merge afterwards. Can you rebase the PR?

import java.util.Optional;

/**
* A CloseableIterator for insert operation result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use {@link CloseableIterator} in such cases to make refactorings easier in the future

class InsertResultIterator implements CloseableIterator<Row> {
private final JobClient jobClient;
private final Row affectedRowCountsRow;
private Optional<Boolean> hasNext = Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public boolean hasNext() {
if (!hasNext.isPresent()) {
try {
jobClient.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the classloader configurable for this class and pass the context classloader higher in the stack. The TableEnvironment will have it soon (see FLINK-15635).

|GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)
""".stripMargin
execInsertSqlAndWaitResult(tableEnv, query)
tableEnv.executeSql(query).await()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrong indention

@godfreyhe
Copy link
Contributor Author

@twalthr I have updated the pr based on latest comments

@twalthr twalthr closed this in 6f2a041 Sep 7, 2020
@godfreyhe godfreyhe deleted the FLINK-18337 branch September 7, 2020 09:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants