Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-13503][API] Add contract in LookupableTableSource to specify the behavior when lookupKeys contains null. #9335

Closed
wants to merge 4 commits into from

Conversation

beyond1920
Copy link
Contributor

What is the purpose of the change

Add contract in LookupableTableSource to specify the behavior when lookupKeys contains null.
And update existed connector to comply with this contract.

Brief change log

  • Add contract in LookupableTableSource to specify the behavior when lookupKeys contains null.
  • Update HBase lookupFunction
  • Update JDBC lookupFunction

Verifying this change

existed IT.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@beyond1920
Copy link
Contributor Author

cc @JingsongLi @wuchong @lincoln-lil

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 2, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit e134e61 (Tue Aug 27 09:12:47 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 2, 2019

CI report:

@lincoln-lil
Copy link
Contributor

If I understand this issue correctly, should it just convert the null key into xx is null clause for JDBCLookupFunction instead of is not distinct from clause or other variant?

@beyond1920
Copy link
Contributor Author

beyond1920 commented Aug 2, 2019

@lincoln-lil The issue aims to specify the behavior when the lookupKeys contains null value. It has no relationship with is not distinct from clause.

@@ -101,7 +105,7 @@ public JDBCLookupFunction(
this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
this.keySqlTypes = Arrays.stream(keyTypes).mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
this.outputSqlTypes = Arrays.stream(fieldTypes).mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
this.query = options.getDialect().getSelectFromStatement(
this.query = options.getDialect().getSelectNotDistinctFromStatement(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this query performance poor against the older one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I would add two prepareStatement with two query template, if arguments does not contain any null value, it using original query template; else use the new one. Ok?

* @param lookupKeys the chosen field names as lookup keys, it is in the defined order
*
* <p>IMPORTANT:
* If the returned {@link TableFunction} receives a lookup request with null value in lookup keys, expect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give more detailed comments here maybe more clearly for understanding,

  1. lookupKeys are nullable
  2. should carefully deal with null value lookup in each concrete store engine, e.g., in MySQL this can a is null query
    ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll try to explain more clearly in the comment

Copy link
Contributor

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Only minor comments here.

… the behavior when lookupKeys contains null.
…or to avoid illegalArgument exception when create Get object.
… when there is null value in input data of eval.
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @beyond1920 , I left some comments.

* name | STRING
* -----------------
* For the external system which does not support null value (E.g, HBase does not support null value on rowKey),
* it could throw an exception or discard the request when receiving a request with null value on lookup key.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please never throw an exception. We should discard the request because HBase don't have null rowkeys.

* @param lookupKeys the chosen field names as lookup keys, it is in the defined order
*/
TableFunction<T> getLookupFunction(String[] lookupKeys);

/**
* Gets the {@link AsyncTableFunction} which supports async lookup one key at a time.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update javadoc of this method too.

Get get;
try {
get = readHelper.createGet(row);
} catch (IllegalArgumentException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't use try catch to do this job in performance critical code. We can return if length of row is zero.

* @return serialize bytes.
*/
public byte[] serialize(Object rowKey) {
byte[] key = HBaseTypeUtils.serializeFromObject(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return HBaseTypeUtils.serializeFromObject(...

rowKeyType,
charset);
Get get = new Get(rowkey);
public Get createGet(byte[] row) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row -> rowkey

import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless import

@@ -20,6 +20,7 @@

import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.addons.hbase.util.HBaseTypeUtils;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless import

JDBCUtils.setField(statement, keySqlTypes[i], keys[i], i);
if (containsNull) {
JDBCUtils.setField(statement, keySqlTypes[i], keys[i], 2 * i);
JDBCUtils.setField(statement, keySqlTypes[i], keys[i], 2 * i + 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep this special logic for now. But please open a JIRA to improve this. We can introduced a custom NamedPreparedStatement to pass each field only once. see https://www.javaworld.com/article/2077706/named-parameters-for-preparedstatement.html

@@ -101,7 +113,8 @@ public JDBCLookupFunction(
this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
this.keySqlTypes = Arrays.stream(keyTypes).mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
this.outputSqlTypes = Arrays.stream(fieldTypes).mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
this.query = options.getDialect().getSelectFromStatement(
this.nonNullableQuery = options.getDialect().getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
this.nullableQuery = options.getDialect().getSelectNotDistinctFromStatement(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a consensus on the new methods of SqlDialect. we can call the field nullSafeQuery and nonNullableQuery -> nullUnsafeQuery.

@@ -76,7 +87,8 @@
private final int maxRetryTimes;

private transient Connection dbConn;
private transient PreparedStatement statement;
private transient PreparedStatement fastStatement;
private transient PreparedStatement slowStatement;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a consensus on the new methods of SqlDialect. We can rename these fields to nullSafeStatemen and nullUnsafeStatement, we can add comment on the fields that we should use nullUnsafeStatement as much as possible because it is faster.

@beyond1920 beyond1920 closed this Mar 2, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants