Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12710][table] Prepare expressions for a new resolved expression #8790

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
package org.apache.flink.table.api;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;

/**
* Partially defined over window with (optional) partitioning and order.
*/
Expand Down Expand Up @@ -83,7 +83,7 @@ public OverWindow as(Expression alias) {
alias,
partitionBy,
orderBy,
new CallExpression(BuiltInFunctionDefinitions.UNBOUNDED_RANGE, Collections.emptyList()),
unresolvedCall(BuiltInFunctionDefinitions.UNBOUNDED_RANGE),
Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,42 @@
@Internal
public abstract class ApiExpressionDefaultVisitor<T> extends ApiExpressionVisitor<T> {
@Override
public T visitCall(CallExpression call) {
return defaultMethod(call);
public T visit(UnresolvedCallExpression unresolvedCall) {
return defaultMethod(unresolvedCall);
}

@Override
public T visitValueLiteral(ValueLiteralExpression valueLiteralExpression) {
public T visit(ValueLiteralExpression valueLiteralExpression) {
return defaultMethod(valueLiteralExpression);
}

@Override
public T visitFieldReference(FieldReferenceExpression fieldReference) {
public T visit(FieldReferenceExpression fieldReference) {
return defaultMethod(fieldReference);
}

@Override
public T visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference) {
public T visit(UnresolvedReferenceExpression unresolvedReference) {
return defaultMethod(unresolvedReference);
}

@Override
public T visitLocalReference(LocalReferenceExpression localReference) {
public T visit(LocalReferenceExpression localReference) {
return defaultMethod(localReference);
}

@Override
public T visitTypeLiteral(TypeLiteralExpression typeLiteral) {
public T visit(TypeLiteralExpression typeLiteral) {
return defaultMethod(typeLiteral);
}

@Override
public T visitTableReference(TableReferenceExpression tableReference) {
public T visit(TableReferenceExpression tableReference) {
return defaultMethod(tableReference);
}

@Override
public T visitLookupCall(LookupCallExpression lookupCall) {
public T visit(LookupCallExpression lookupCall) {
return defaultMethod(lookupCall);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ private ApiExpressionUtils() {
// private
}

public static CallExpression call(FunctionDefinition functionDefinition, Expression... args) {
return new CallExpression(functionDefinition, Arrays.asList(args));
}

public static ValueLiteralExpression valueLiteral(Object value) {
return new ValueLiteralExpression(value);
}
Expand All @@ -65,6 +61,10 @@ public static UnresolvedReferenceExpression unresolvedRef(String name) {
return new UnresolvedReferenceExpression(name);
}

public static UnresolvedCallExpression unresolvedCall(FunctionDefinition functionDefinition, Expression... args) {
return new UnresolvedCallExpression(functionDefinition, Arrays.asList(args));
}

public static TableReferenceExpression tableRef(String name, Table table) {
return new TableReferenceExpression(name, table.getQueryOperation());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@
@Internal
public abstract class ApiExpressionVisitor<R> implements ExpressionVisitor<R> {

public abstract R visitTableReference(TableReferenceExpression tableReference);
public abstract R visit(TableReferenceExpression tableReference);

public abstract R visitLocalReference(LocalReferenceExpression localReference);
public abstract R visit(LocalReferenceExpression localReference);

public abstract R visitLookupCall(LookupCallExpression lookupCall);
public abstract R visit(LookupCallExpression lookupCall);

public abstract R visitUnresolvedReference(UnresolvedReferenceExpression unresolvedReference);
public abstract R visit(UnresolvedReferenceExpression unresolvedReference);

public final R visit(Expression other) {
if (other instanceof TableReferenceExpression) {
return visitTableReference((TableReferenceExpression) other);
return visit((TableReferenceExpression) other);
} else if (other instanceof LocalReferenceExpression) {
return visitLocalReference((LocalReferenceExpression) other);
return visit((LocalReferenceExpression) other);
} else if (other instanceof LookupCallExpression) {
return visitLookupCall((LookupCallExpression) other);
return visit((LookupCallExpression) other);
} else if (other instanceof UnresolvedReferenceExpression) {
return visitUnresolvedReference((UnresolvedReferenceExpression) other);
return visit((UnresolvedReferenceExpression) other);
}
return visitNonApiExpression(other);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public DataType getOutputDataType() {
return dataType;
}

@Override
public String asSummaryString() {
return name;
}

@Override
public List<Expression> getChildren() {
return Collections.emptyList();
Expand Down Expand Up @@ -82,6 +87,6 @@ public int hashCode() {

@Override
public String toString() {
return name;
return asSummaryString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,19 @@ public LookupCallExpression(String unresolvedFunction, List<Expression> args) {
this.args = Collections.unmodifiableList(new ArrayList<>(Preconditions.checkNotNull(args)));
}

public String getUnresolvedName() {
return unresolvedName;
}

@Override
public List<Expression> getChildren() {
return this.args;
public String asSummaryString() {
final List<String> argList = args.stream().map(Object::toString).collect(Collectors.toList());
return unresolvedName + "(" + String.join(", ", argList) + ")";
}

public String getUnresolvedName() {
return unresolvedName;
@Override
public List<Expression> getChildren() {
return this.args;
}

@Override
Expand Down Expand Up @@ -80,7 +86,6 @@ public int hashCode() {

@Override
public String toString() {
final List<String> argList = args.stream().map(Object::toString).collect(Collectors.toList());
return unresolvedName + "(" + String.join(", ", argList) + ")";
return asSummaryString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.apache.flink.table.functions.FunctionDefinition;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;

/**
* Resolves calls with function names to calls with actual function definitions.
Expand All @@ -38,24 +39,24 @@ public LookupCallResolver(FunctionLookup functionLookup) {
this.functionLookup = functionLookup;
}

public Expression visitLookupCall(LookupCallExpression lookupCall) {
public Expression visit(LookupCallExpression lookupCall) {
final FunctionLookup.Result result = functionLookup.lookupFunction(lookupCall.getUnresolvedName())
.orElseThrow(() -> new ValidationException("Undefined function: " + lookupCall.getUnresolvedName()));

return createResolvedCall(result.getFunctionDefinition(), lookupCall.getChildren());
}

public Expression visitCall(CallExpression call) {
return createResolvedCall(call.getFunctionDefinition(), call.getChildren());
public Expression visit(UnresolvedCallExpression unresolvedCall) {
return createResolvedCall(unresolvedCall.getFunctionDefinition(), unresolvedCall.getChildren());
}

private Expression createResolvedCall(FunctionDefinition functionDefinition, List<Expression> unresolvedChildren) {
List<Expression> resolvedChildren = unresolvedChildren
final Expression[] resolvedChildren = unresolvedChildren
.stream()
.map(child -> child.accept(this))
.collect(Collectors.toList());
.toArray(Expression[]::new);

return new CallExpression(functionDefinition, resolvedChildren);
return unresolvedCall(functionDefinition, resolvedChildren);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public QueryOperation getQueryOperation() {
return queryOperation;
}

@Override
public String asSummaryString() {
return name;
}

@Override
public List<Expression> getChildren() {
return Collections.emptyList();
Expand Down Expand Up @@ -80,6 +85,6 @@ public int hashCode() {

@Override
public String toString() {
return name;
return asSummaryString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public String getName() {
return name;
}

@Override
public String asSummaryString() {
return name;
}

@Override
public List<Expression> getChildren() {
return Collections.emptyList();
Expand Down Expand Up @@ -74,6 +79,6 @@ public int hashCode() {

@Override
public String toString() {
return name;
return asSummaryString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.LocalReferenceExpression;
import org.apache.flink.table.expressions.LookupCallExpression;
import org.apache.flink.table.expressions.TableReferenceExpression;
import org.apache.flink.table.expressions.UnresolvedCallExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;

Expand All @@ -35,7 +35,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.table.expressions.ApiExpressionUtils.call;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedRef;
import static org.apache.flink.table.expressions.ApiExpressionUtils.valueLiteral;
import static org.apache.flink.table.expressions.ExpressionUtils.extractValue;
Expand Down Expand Up @@ -140,7 +140,7 @@ public static Optional<String> extractName(Expression expression) {
private static List<Expression> nameExpressions(Map<Expression, String> expressions) {
return expressions.entrySet()
.stream()
.map(entry -> call(AS, entry.getKey(), valueLiteral(entry.getValue())))
.map(entry -> unresolvedCall(AS, entry.getKey(), valueLiteral(entry.getValue())))
.collect(Collectors.toList());
}

Expand All @@ -151,19 +151,19 @@ private static class AggregationAndPropertiesSplitter extends ApiExpressionDefau
private final Map<Expression, String> properties = new LinkedHashMap<>();

@Override
public Void visitLookupCall(LookupCallExpression unresolvedCall) {
public Void visit(LookupCallExpression unresolvedCall) {
throw new IllegalStateException("All calls should be resolved by now. Got: " + unresolvedCall);
}

@Override
public Void visitCall(CallExpression call) {
FunctionDefinition functionDefinition = call.getFunctionDefinition();
if (isFunctionOfKind(call, AGGREGATE)) {
aggregates.computeIfAbsent(call, expr -> "EXPR$" + uniqueId++);
public Void visit(UnresolvedCallExpression unresolvedCall) {
FunctionDefinition functionDefinition = unresolvedCall.getFunctionDefinition();
if (isFunctionOfKind(unresolvedCall, AGGREGATE)) {
aggregates.computeIfAbsent(unresolvedCall, expr -> "EXPR$" + uniqueId++);
} else if (WINDOW_PROPERTIES.contains(functionDefinition)) {
properties.computeIfAbsent(call, expr -> "EXPR$" + uniqueId++);
properties.computeIfAbsent(unresolvedCall, expr -> "EXPR$" + uniqueId++);
} else {
call.getChildren().forEach(c -> c.accept(this));
unresolvedCall.getChildren().forEach(c -> c.accept(this));
}
return null;
}
Expand All @@ -187,23 +187,23 @@ private AggregationAndPropertiesReplacer(
}

@Override
public Expression visitLookupCall(LookupCallExpression unresolvedCall) {
public Expression visit(LookupCallExpression unresolvedCall) {
throw new IllegalStateException("All calls should be resolved by now. Got: " + unresolvedCall);
}

@Override
public Expression visitCall(CallExpression call) {
if (aggregates.get(call) != null) {
return unresolvedRef(aggregates.get(call));
} else if (properties.get(call) != null) {
return unresolvedRef(properties.get(call));
public Expression visit(UnresolvedCallExpression unresolvedCall) {
if (aggregates.get(unresolvedCall) != null) {
return unresolvedRef(aggregates.get(unresolvedCall));
} else if (properties.get(unresolvedCall) != null) {
return unresolvedRef(properties.get(unresolvedCall));
}

List<Expression> args = call.getChildren()
final Expression[] args = unresolvedCall.getChildren()
.stream()
.map(c -> c.accept(this))
.collect(Collectors.toList());
return new CallExpression(call.getFunctionDefinition(), args);
.toArray(Expression[]::new);
return unresolvedCall(unresolvedCall.getFunctionDefinition(), args);
}

@Override
Expand All @@ -214,26 +214,26 @@ protected Expression defaultMethod(Expression expression) {

private static class ExtractNameVisitor extends ApiExpressionDefaultVisitor<Optional<String>> {
@Override
public Optional<String> visitCall(CallExpression call) {
if (call.getFunctionDefinition().equals(AS)) {
return extractValue(call.getChildren().get(1), String.class);
public Optional<String> visit(UnresolvedCallExpression unresolvedCall) {
if (unresolvedCall.getFunctionDefinition() == AS) {
return extractValue(unresolvedCall.getChildren().get(1), String.class);
} else {
return Optional.empty();
}
}

@Override
public Optional<String> visitLocalReference(LocalReferenceExpression localReference) {
public Optional<String> visit(LocalReferenceExpression localReference) {
return Optional.of(localReference.getName());
}

@Override
public Optional<String> visitTableReference(TableReferenceExpression tableReference) {
public Optional<String> visit(TableReferenceExpression tableReference) {
return Optional.of(tableReference.getName());
}

@Override
public Optional<String> visitFieldReference(FieldReferenceExpression fieldReference) {
public Optional<String> visit(FieldReferenceExpression fieldReference) {
return Optional.of(fieldReference.getName());
}

Expand Down
Loading