Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink 16024][Connector][JDBC] Support FilterPushdown #20140

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public class JdbcFilterPushdownPreparedStatementVisitor
extends ExpressionDefaultVisitor<Optional<ParameterizedPredicate>> {

private Function<String, String> quoteIdentifierFunction;
private final Function<String, String> quoteIdentifierFunction;

public JdbcFilterPushdownPreparedStatementVisitor(
Function<String, String> quoteIdentifierFunction) {
Expand Down Expand Up @@ -79,6 +79,12 @@ public Optional<ParameterizedPredicate> visit(CallExpression call) {
if (BuiltInFunctionDefinitions.AND.equals(call.getFunctionDefinition())) {
return renderBinaryOperator("AND", call.getResolvedChildren());
}
if (BuiltInFunctionDefinitions.IS_NULL.equals(call.getFunctionDefinition())) {
return renderUnaryOperator("IS NULL", call.getResolvedChildren().get(0), true);
}
if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(call.getFunctionDefinition())) {
return renderUnaryOperator("IS NOT NULL", call.getResolvedChildren().get(0), true);
}

return Optional.empty();
}
Expand All @@ -93,39 +99,74 @@ private Optional<ParameterizedPredicate> renderBinaryOperator(
left -> rightOperandString.map(right -> left.combine(operator, right)));
}

private Optional<ParameterizedPredicate> renderUnaryOperator(
String operator, ResolvedExpression operand, Boolean operandOnLeft) {
qingwei91 marked this conversation as resolved.
Show resolved Hide resolved
if (operand instanceof FieldReferenceExpression) {
Optional<ParameterizedPredicate> fieldPartialPredicate =
this.visit((FieldReferenceExpression) operand);
if (operandOnLeft) {
return fieldPartialPredicate.map(
fieldPred ->
new ParameterizedPredicate(
String.format(
"(%s %s)", fieldPred.getPredicate(), operator)));
} else {
return fieldPartialPredicate.map(
fieldPred ->
new ParameterizedPredicate(
String.format(
"(%s %s)", operator, fieldPred.getPredicate())));
}
} else {
return Optional.empty();
}
}

@Override
public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {
LogicalType tpe = litExp.getOutputDataType().getLogicalType();
Serializable[] params = new Serializable[1];

ParameterizedPredicate predicate = new ParameterizedPredicate("?");
switch (tpe.getTypeRoot()) {
libenchao marked this conversation as resolved.
Show resolved Hide resolved
case CHAR:
params[0] = litExp.getValueAs(String.class).orElse(null);
predicate.setParameters(params);
return Optional.of(predicate);
case VARCHAR:
params[0] = litExp.getValueAs(String.class).orElse(null);
predicate.setParameters(params);
return Optional.of(predicate);
case BIGINT:
params[0] = litExp.getValueAs(Long.class).orElse(null);
case BOOLEAN:
params[0] = litExp.getValueAs(Boolean.class).orElse(null);
predicate.setParameters(params);
return Optional.of(predicate);
case INTEGER:
params[0] = litExp.getValueAs(Integer.class).orElse(null);
case DECIMAL:
params[0] = litExp.getValueAs(BigDecimal.class).orElse(null);
predicate.setParameters(params);
return Optional.of(predicate);
case DOUBLE:
params[0] = litExp.getValueAs(Double.class).orElse(null);
case TINYINT:
params[0] = litExp.getValueAs(Byte.class).orElse(null);
predicate.setParameters(params);
return Optional.of(predicate);
case BOOLEAN:
params[0] = litExp.getValueAs(Boolean.class).orElse(null);
case SMALLINT:
params[0] = litExp.getValueAs(Short.class).orElse(null);
predicate.setParameters(params);
return Optional.of(predicate);
case INTEGER:
params[0] = litExp.getValueAs(Integer.class).orElse(null);
predicate.setParameters(params);
return Optional.of(predicate);
case BIGINT:
params[0] = litExp.getValueAs(Long.class).orElse(null);
predicate.setParameters(params);
return Optional.of(predicate);
case FLOAT:
params[0] = litExp.getValueAs(Float.class).orElse(null);
predicate.setParameters(params);
return Optional.of(predicate);
case DECIMAL:
params[0] = litExp.getValueAs(BigDecimal.class).orElse(null);
case DOUBLE:
params[0] = litExp.getValueAs(Double.class).orElse(null);
predicate.setParameters(params);
return Optional.of(predicate);
case DATE:
Expand All @@ -148,7 +189,7 @@ public Optional<ParameterizedPredicate> visit(ValueLiteralExpression litExp) {

@Override
public Optional<ParameterizedPredicate> visit(FieldReferenceExpression fieldReference) {
String predicateStr = (this.quoteIdentifierFunction.apply(fieldReference.toString()));
String predicateStr = this.quoteIdentifierFunction.apply(fieldReference.toString());
ParameterizedPredicate predicate = new ParameterizedPredicate(predicateStr);
return Optional.of(predicate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,21 @@ public void testExpressionWithNull() {
orExpr, schema, "((id = ?) OR (description = ?))", expectedParams2);
}

@Test
public void testExpressionIsNull() {
ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
String andExpr = "id IS NULL AND real_col <= 0.6";

Serializable[] expectedParams1 = {new BigDecimal("0.6")};
assertGeneratedSQLString(
andExpr, schema, "((id IS NULL) AND (real_col <= ?))", expectedParams1);

Serializable[] expectedParams2 = {6L};
String orExpr = "id = 6 OR description IS NOT NULL";
assertGeneratedSQLString(
orExpr, schema, "((id = ?) OR (description IS NOT NULL))", expectedParams2);
}

@Test
public void testComplexExpressionPrimitiveType() {
ResolvedSchema schema = tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE).getResolvedSchema();
Expand Down