diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 83b4dd29ed..25c4bfab4b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -86,9 +86,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource; import static org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPredicate; @@ -567,9 +569,45 @@ && hasPrimaryKey() // source or a lookup source. Since fluss lookup sources cannot accept filters yet, to // be safe, we return all filters to the Flink planner. return Result.of(acceptedFilters, filters); + } else { + acceptedFilters = new ArrayList<>(); + if (lakeSource != null) { + acceptedFilters = pushdownLakeFilters(filters); + } + } + + return Result.of(acceptedFilters, filters); + } + + private List pushdownLakeFilters(List filters) { + List lakePredicates = new ArrayList<>(); + List convertedFilters = new ArrayList<>(); + for (ResolvedExpression filter : filters) { + Optional predicateOptional = + convertToFlussPredicate(tableOutputType, filter); + if (predicateOptional.isPresent()) { + lakePredicates.add(predicateOptional.get()); + convertedFilters.add(filter); + } } - return Result.of(Collections.emptyList(), filters); + List acceptedFilters = new ArrayList<>(); + + if (lakePredicates.isEmpty()) { + return acceptedFilters; + } + + LakeSource.FilterPushDownResult filterPushDownResult = + checkNotNull(lakeSource).withFilters(lakePredicates); + Set acceptedLakePredicates = + Collections.newSetFromMap(new IdentityHashMap()); + acceptedLakePredicates.addAll(filterPushDownResult.acceptedPredicates()); + for (int i = 0; i < lakePredicates.size(); i++) { + if (acceptedLakePredicates.contains(lakePredicates.get(i))) { + acceptedFilters.add(convertedFilters.get(i)); + } + } + return acceptedFilters; } @Override