Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-13503][API] Add contract in LookupableTableSource to specify the behavior when lookupKeys contains null. #9335

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.addons.hbase.util.HBaseTypeUtils;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless import

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.FunctionContext;
Expand All @@ -34,7 +35,9 @@
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless import

import org.apache.hadoop.hbase.client.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -72,7 +75,15 @@ public HBaseLookupFunction(
*/
public void eval(Object rowKey) throws IOException {
// fetch result
Result result = table.get(readHelper.createGet(rowKey));
byte[] row = readHelper.serialize(rowKey);
Get get;
try {
get = readHelper.createGet(row);
} catch (IllegalArgumentException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't use try catch to do this job in performance critical code. We can return if length of row is zero.

LOG.warn("Ignore illegal rowKey : {}!", rowKey);
return;
}
Result result = table.get(get);
if (!result.isEmpty()) {
// parse and collect
collect(readHelper.parseToRow(result, rowKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,27 @@ public HBaseReadWriteHelper(HBaseTableSchema hbaseTableSchema) {
this.resultRow = new Row(fieldLength);
}

/**
* Serializes a rowkey object into byte array.
* @param rowKey rowkey object to serialize
*
* @return serialize bytes.
*/
public byte[] serialize(Object rowKey) {
byte[] key = HBaseTypeUtils.serializeFromObject(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return HBaseTypeUtils.serializeFromObject(...

rowKey,
rowKeyType,
charset);
return key;
}

/**
* Returns an instance of Get that retrieves the matches records from the HBase table.
*
* @return The appropriate instance of Get for this use case.
*/
public Get createGet(Object rowKey) {
byte[] rowkey = HBaseTypeUtils.serializeFromObject(
rowKey,
rowKeyType,
charset);
Get get = new Get(rowkey);
public Get createGet(byte[] row) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row -> rowkey

Get get = new Get(row);
for (int f = 0; f < families.length; f++) {
byte[] family = families[f];
for (byte[] qualifier : qualifiers[f]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,21 @@

/**
* A {@link TableFunction} to query fields from JDBC by keys.
* The query template like:
* If look up keys don't contain any null value, using the nonNullableQuery template like:
* <PRE>
* SELECT c, d, e, f from T where a = ? and b = ?
* </PRE>
*
* If lookup keys contain any null value, using the nullableQuery template like:
* <PRE>
* SELECT c, d, e, f from T where (a = ? or (a is null and ? is null)) and (b = ? or (b is null and ? is null))
* </PRE>
*
* Note: the following query template is better, but `Is NOT DISTINCT FROM` is not generally supported yet:
* <PRE>
* SELECT c, d, e, f from T where (a is not distinct from ?) and (b is not distinct from ?)
* </PRE>
*
* <p>Support cache the result to avoid frequent accessing to remote databases.
* 1.The cacheMaxSize is -1 means not use cache.
* 2.For real-time data, you need to set the TTL of cache.
Expand All @@ -61,7 +71,8 @@ public class JDBCLookupFunction extends TableFunction<Row> {
private static final Logger LOG = LoggerFactory.getLogger(JDBCLookupFunction.class);
private static final long serialVersionUID = 1L;

private final String query;
private final String nonNullableQuery;
private final String nullableQuery;
private final String drivername;
private final String dbURL;
private final String username;
Expand All @@ -76,7 +87,8 @@ public class JDBCLookupFunction extends TableFunction<Row> {
private final int maxRetryTimes;

private transient Connection dbConn;
private transient PreparedStatement statement;
private transient PreparedStatement fastStatement;
private transient PreparedStatement slowStatement;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a consensus on the new methods of SqlDialect. We can rename these fields to nullSafeStatemen and nullUnsafeStatement, we can add comment on the fields that we should use nullUnsafeStatement as much as possible because it is faster.

private transient Cache<Row, List<Row>> cache;

public JDBCLookupFunction(
Expand All @@ -101,7 +113,8 @@ public JDBCLookupFunction(
this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
this.keySqlTypes = Arrays.stream(keyTypes).mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
this.outputSqlTypes = Arrays.stream(fieldTypes).mapToInt(JDBCTypeUtil::typeInformationToSqlType).toArray();
this.query = options.getDialect().getSelectFromStatement(
this.nonNullableQuery = options.getDialect().getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
this.nullableQuery = options.getDialect().getSelectNotDistinctFromStatement(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a consensus on the new methods of SqlDialect. we can call the field nullSafeQuery and nonNullableQuery -> nullUnsafeQuery.

options.getTableName(), fieldNames, keyNames);
}

Expand All @@ -113,7 +126,8 @@ public static Builder builder() {
public void open(FunctionContext context) throws Exception {
try {
establishConnection();
statement = dbConn.prepareStatement(query);
fastStatement = dbConn.prepareStatement(nonNullableQuery);
slowStatement = dbConn.prepareStatement(nullableQuery);
this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
Expand All @@ -126,6 +140,20 @@ public void open(FunctionContext context) throws Exception {
}

public void eval(Object... keys) {
boolean containsNull = false;
for (Object key : keys) {
if (key == null) {
containsNull = true;
break;
}
}
PreparedStatement statement;
if (containsNull) {
statement = slowStatement;
} else {
statement = fastStatement;
}

Row keyRow = Row.of(keys);
if (cache != null) {
List<Row> cachedRows = cache.getIfPresent(keyRow);
Expand All @@ -141,7 +169,12 @@ public void eval(Object... keys) {
try {
statement.clearParameters();
for (int i = 0; i < keys.length; i++) {
JDBCUtils.setField(statement, keySqlTypes[i], keys[i], i);
if (containsNull) {
JDBCUtils.setField(statement, keySqlTypes[i], keys[i], 2 * i);
JDBCUtils.setField(statement, keySqlTypes[i], keys[i], 2 * i + 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep this special logic for now. But please open a JIRA to improve this. We can introduced a custom NamedPreparedStatement to pass each field only once. see https://www.javaworld.com/article/2077706/named-parameters-for-preparedstatement.html

} else {
JDBCUtils.setField(statement, keySqlTypes[i], keys[i], i);
}
}
try (ResultSet resultSet = statement.executeQuery()) {
if (cache == null) {
Expand Down Expand Up @@ -198,13 +231,23 @@ public void close() throws IOException {
cache.cleanUp();
cache = null;
}
if (statement != null) {
if (fastStatement != null) {
try {
fastStatement.close();
} catch (SQLException e) {
LOG.info("JDBC fastStatement could not be closed: " + e.getMessage());
} finally {
fastStatement = null;
}
}

if (slowStatement != null) {
try {
statement.close();
slowStatement.close();
} catch (SQLException e) {
LOG.info("JDBC statement could not be closed: " + e.getMessage());
LOG.info("JDBC slowStatement could not be closed: " + e.getMessage());
} finally {
statement = null;
slowStatement = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ default String getDeleteStatement(String tableName, String[] conditionFields) {
return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
}

/**
* Get select fields statement by `is not distinct from` condition fields. Default use SELECT.
*/
default String getSelectNotDistinctFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would re-design the method names a bit.

  • Add a new method getSelectStatement(String tableName, String[] selectFields) which doesn't contain WHERE clause.
  • Rename getSelectFromStatement to getFilterStatement(String tableName, String[] selectFields, String[] conditionFields) and forces conditionFields shouldn't be empty.
  • Rename getSelectNotDistinctFromStatement to getNullSafeFilterStatement(...), because the default implementation is not is not distinct from but is a null safe comparison different from getFilterStatement. We should also outline this in the method javadoc.

String selectExpressions = Arrays.stream(selectFields)
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String fieldExpressions = Arrays.stream(conditionFields)
.map(f -> "(" + quoteIdentifier(f) + "=? OR (" + quoteIdentifier(f) + " IS NULL AND ? IS NULL))")
.collect(Collectors.joining(" AND "));
return "SELECT " + selectExpressions + " FROM " +
quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
}

/**
* Get select fields statement by condition fields. Default use SELECT.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,32 @@
public interface LookupableTableSource<T> extends TableSource<T> {

/**
* Gets the {@link TableFunction} which supports lookup one key at a time.
* Gets the {@link TableFunction} which supports lookup one key at a time. Calling `eval`
* method in the returned {@link TableFunction} means send a lookup request to the TableSource.
*
* <p><strong>IMPORTANT:</strong>
* Lookup keys in a request may contain null value. When it happens, it expects to lookup
* records with null value on the lookup key field.
* E.g., for a MySQL table with the following schema, send a lookup request with null value
* on `age` field means to find students whose age are unknown (CAUTION: It is equivalent to filter condition:
* `WHERE age IS NULL` instead of `WHERE age = null`).
* -----------------
* Table : Student
* -----------------
* id | LONG
* age | INT
* name | STRING
* -----------------
* For the external system which does not support null value (E.g, HBase does not support null value on rowKey),
* it could throw an exception or discard the request when receiving a request with null value on lookup key.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please never throw an exception. We should discard the request because HBase don't have null rowkeys.

*
* @param lookupKeys the chosen field names as lookup keys, it is in the defined order
*/
TableFunction<T> getLookupFunction(String[] lookupKeys);

/**
* Gets the {@link AsyncTableFunction} which supports async lookup one key at a time.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update javadoc of this method too.

* @param lookupKeys the chosen field names as lookup keys, it is in the defined order
*/
AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);
Expand Down