Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
be575db
[FLINK-37267][table] Add support for UNNEST WITH ORDINALITY
gustavodemorais Feb 6, 2025
32d5725
[FLINK-37267][table] Add stream, batch and plan tests for withordinality
gustavodemorais Feb 6, 2025
9972d3c
[FLINK-37267][docs] Revamp outdated unnest documentation
gustavodemorais Feb 6, 2025
57137d2
[FLINK-37267][table] Support with ordinality for array/multiset of rows
gustavodemorais Feb 7, 2025
ef1df4c
[FLINK-37267][table] Add test for map of rows with ordinality
gustavodemorais Feb 8, 2025
ad7826b
[FLINK-37267][table] Simplify logic for output type with ordinality
gustavodemorais Feb 9, 2025
e179c09
[FLINK-37267][docs] Complement docs with multiset and map example + typo
gustavodemorais Feb 14, 2025
cda44cf
[FLINK-37267][table] Pass withOrdinality to base class constructors
gustavodemorais Feb 15, 2025
7f522cf
[FLINK-37267][table] Refactor getFields with new RowType for naming
gustavodemorais Feb 15, 2025
e54bf99
[FLINK-37267][table] Refactor UnnestITCase to use parameterized calls
gustavodemorais Feb 15, 2025
8d964da
[FLINK-37267][table] Use .asJava instead of HashMap
gustavodemorais Feb 15, 2025
143e6ad
[FLINK-37267][table] Update unnest plans with column names
gustavodemorais Feb 17, 2025
5c72d20
[FLINK-37267][table] Use JoinedRowData instead of field getters
gustavodemorais Feb 19, 2025
767ac26
[FLINK-37267][table] Remove parallelism from assertUnnest
gustavodemorais Feb 19, 2025
77277e0
[FLINK-37267][table] Column names to EXPR$0 and ORDINALITY
gustavodemorais Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 86 additions & 4 deletions docs/content/docs/dev/table/sql/queries/joins.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,96 @@ FROM Orders AS o

In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The `FOR SYSTEM_TIME AS OF` clause with the subsequent processing time attribute ensures that each row of the `Orders` table is joined with those Customers rows that match the join predicate at the point in time when the `Orders` row is processed by the join operator. It also prevents that the join result is updated when a joined `Customer` row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above `o.customer_id = c.id`.

Array Expansion
Array, Multiset and Map Expansion
--------------

Returns a new row for each element in the given array. Unnesting `WITH ORDINALITY` is not yet supported.
Unnest returns a new row for each element in the given array, multiset or map. Supports both `CROSS JOIN` and `LEFT JOIN`.
```sql
-- Returns a new row for each element in a constant array
SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY['shirt', 'pants', 'hat'])

id product_name
======= ============
order_1 shirt
order_1 pants
order_1 hat

-- Returns a new row for each element in the array
-- assuming a Orders table with an array column `product_names`
SELECT order_id, product_name
FROM Orders
CROSS JOIN UNNEST(product_names) AS t(product_name)
```

Unnesting `WITH ORDINALITY` is also supported. Currently, `WITH ORDINALITY` only supports `CROSS JOIN` but not `LEFT JOIN`.


```sql
SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
-- Returns a new row for each element in a constant array and its position in the array
SELECT *
FROM (VALUES ('order_1'), ('order_2'))
CROSS JOIN UNNEST(ARRAY['shirt', 'pants', 'hat'])
WITH ORDINALITY AS t(product_name, index)

id product_name index
======= ============ =====
order_1 shirt 1
order_1 pants 2
order_1 hat 3
order_2 shirt 1
order_2 pants 2
order_2 hat 3

-- Returns a new row for each element and its position in the array
-- assuming a Orders table with an array column `product_names`
SELECT order_id, product_name, product_index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make an example for multisets? It's not clear to me how WITH ORDINALITY behaves with them. The tests seem to drop the index?

Copy link
Contributor Author

@gustavodemorais gustavodemorais Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, adding one example for multisets and one for a map. Multisets are stored as maps, so we go through the keys and emit them x amount of times, x being how often they occurred (their multiplicity).

Copy link
Contributor Author

@gustavodemorais gustavodemorais Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if the example helps: e179c09

FROM Orders
CROSS JOIN UNNEST(product_names)
WITH ORDINALITY AS t(product_name, product_index)
```

An unnest with ordinality will return each element and the position of the element in the data structure, 1-indexed.
The order of the elements for arrays is guaranteed. Since maps and multisets are unordered, the order of the elements is not guaranteed.

```sql
-- Returns a new row each key/value pair in the map.
SELECT *
FROM
(VALUES('order_1'))
CROSS JOIN UNNEST(MAP['shirt', 2, 'pants', 1, 'hat', 1]) WITH ORDINALITY

id product_name amount index
======= ============ ===== =====
order_1 shirt 2 1
order_1 pants 1 2
order_1 hat 1 3

-- Returns a new row for each instance of a element in a multiset
-- If an element has been seen twice (multiplicity is 2), it will be returned twice
WITH ProductMultiset AS
(SELECT COLLECT(product_name) AS product_multiset
FROM (
VALUES ('shirt'), ('pants'), ('hat'), ('shirt'), ('hat')
) AS t(product_name)) -- produces { 'shirt': 2, 'pants': 1, 'hat': 2 }
SELECT id, product_name, ordinality
FROM
(VALUES ('order_1'), ('order_2')) AS t(id),
ProductMultiset
CROSS JOIN UNNEST(product_multiset) WITH
ORDINALITY AS u(product_name, ordinality);

id product_name index
======= ============ =====
order_1 shirt 1
order_1 shirt 2
order_1 pants 3
order_1 hat 4
order_1 hat 5
order_2 shirt 1
order_2 shirt 2
order_2 pants 3
order_2 hat 4
order_1 hat 5
```

Table Function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,17 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.internal()
.build();

public static final BuiltInFunctionDefinition INTERNAL_UNNEST_ROWS_WITH_ORDINALITY =
BuiltInFunctionDefinition.newBuilder()
.name("$UNNEST_ROWS_WITH_ORDINALITY$1")
.kind(TABLE)
.inputTypeStrategy(sequence(ANY))
.outputTypeStrategy(SpecificTypeStrategies.UNUSED)
.runtimeClass(
"org.apache.flink.table.runtime.functions.table.UnnestRowsWithOrdinalityFunction")
.internal()
.build();

public static final BuiltInFunctionDefinition INTERNAL_HASHCODE =
BuiltInFunctionDefinition.newBuilder()
.name("$HASHCODE$1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.functions.table.UnnestRowsFunction;
import org.apache.flink.table.runtime.functions.table.UnnestRowsFunctionBase;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -68,26 +68,18 @@ public boolean matches(RelOptRuleCall call) {
LogicalFilter logicalFilter = (LogicalFilter) right;
RelNode relNode = getRel(logicalFilter.getInput());
if (relNode instanceof Uncollect) {
return !((Uncollect) relNode).withOrdinality;
return true;
} else if (relNode instanceof LogicalProject) {
LogicalProject logicalProject = (LogicalProject) relNode;
relNode = getRel(logicalProject.getInput());
if (relNode instanceof Uncollect) {
return !((Uncollect) relNode).withOrdinality;
}
return false;
return relNode instanceof Uncollect;
}
} else if (right instanceof LogicalProject) {
LogicalProject logicalProject = (LogicalProject) right;
RelNode relNode = getRel(logicalProject.getInput());
if (relNode instanceof Uncollect) {
Uncollect uncollect = (Uncollect) relNode;
return !uncollect.withOrdinality;
}
return false;
} else if (right instanceof Uncollect) {
Uncollect uncollect = (Uncollect) right;
return !uncollect.withOrdinality;
return relNode instanceof Uncollect;
} else {
return right instanceof Uncollect;
}
return false;
}
Expand Down Expand Up @@ -131,16 +123,22 @@ private RelNode convert(RelNode relNode, LogicalCorrelate correlate) {
((Map.Entry) uncollect.getInput().getRowType().getFieldList().get(0))
.getValue();
LogicalType logicalType = FlinkTypeFactory.toLogicalType(relDataType);

BridgingSqlFunction sqlFunction =
BridgingSqlFunction.of(
cluster, BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);
cluster,
uncollect.withOrdinality
? BuiltInFunctionDefinitions
.INTERNAL_UNNEST_ROWS_WITH_ORDINALITY
: BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);
RexNode rexCall =
cluster.getRexBuilder()
.makeCall(
typeFactory.createFieldTypeFromLogicalType(
toRowType(
UnnestRowsFunction.getUnnestedType(
logicalType))),
UnnestRowsFunctionBase.getUnnestedType(
logicalType,
uncollect.withOrdinality))),
sqlFunction,
((LogicalProject) getRel(uncollect.getInput())).getProjects());
return new LogicalTableFunctionScan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.functions.table.UnnestRowsFunction;
import org.apache.flink.table.runtime.functions.table.UnnestRowsFunctionBase;
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.calcite.plan.RelOptCluster;
Expand All @@ -37,7 +37,6 @@
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.immutables.value.Value;

import java.util.Collections;
Expand Down Expand Up @@ -91,14 +90,20 @@ private RelNode convertUncollect(Uncollect uc) {
RelDataType relDataType = uc.getInput().getRowType().getFieldList().get(0).getValue();
LogicalType logicalType = FlinkTypeFactory.toLogicalType(relDataType);

SqlFunction sqlFunction =
BridgingSqlFunction.of(cluster, BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);
BridgingSqlFunction sqlFunction =
BridgingSqlFunction.of(
cluster,
uc.withOrdinality
? BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS_WITH_ORDINALITY
: BuiltInFunctionDefinitions.INTERNAL_UNNEST_ROWS);

RexNode rexCall =
cluster.getRexBuilder()
.makeCall(
typeFactory.createFieldTypeFromLogicalType(
toRowType(UnnestRowsFunction.getUnnestedType(logicalType))),
toRowType(
UnnestRowsFunctionBase.getUnnestedType(
logicalType, uc.withOrdinality))),
sqlFunction,
((LogicalProject) getRel(uc.getInput())).getProjects());
return new LogicalTableFunctionScan(
Expand Down
Loading