Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,25 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```

#### Partitioning by a key

```sql
-- Input table 'my_aggregation' with columns (name, id, cnt)
-- Default output schema: [op, name, id, cnt]
-- Output schema with PARTITION BY: [id, op, name, cnt]

SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation PARTITION BY id
)
```
When `PARTITION BY` is provided, **the output schema changes**. The partition key columns are moved to the front by the engine, and the function emits the remaining input columns. The order becomes:

```
[partition_keys, op_column, non_partition_input_columns]
```

Prefer row semantics, when possible. `PARTITION BY` is only necessary when downstream operators are keyed on that column and you want to co-locate rows for the same key in the same parallel operator instance.

#### Table API

```java
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.types.DataType;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/** Shared helpers for changelog-style PTFs ({@code TO_CHANGELOG}, {@code FROM_CHANGELOG}). */
@Internal
public final class ChangelogTypeStrategyUtils {

/**
* Returns the input column indices that pass through to the function's output, excluding the
* partition key columns (the PTF framework prepends them when the input has set semantics).
*/
public static int[] computeOutputIndices(final TableSemantics tableSemantics) {
return computeOutputIndices(tableSemantics, -1);
}

/**
* Returns the input column indices that pass through to the function's output, excluding the
* partition key columns and the operation column matching {@code opColumnName}.
*/
public static int[] computeOutputIndices(
final TableSemantics tableSemantics, final String opColumnName) {
final int opIndex = DataType.getFieldNames(tableSemantics.dataType()).indexOf(opColumnName);
return computeOutputIndices(tableSemantics, opIndex);
}

private static int[] computeOutputIndices(
final TableSemantics tableSemantics, final int extraExcludedIndex) {
final Set<Integer> excluded = collectPartitionKeyIndices(tableSemantics);
if (extraExcludedIndex >= 0) {
excluded.add(extraExcludedIndex);
}
final int inputFieldCount = DataType.getFieldCount(tableSemantics.dataType());
return IntStream.range(0, inputFieldCount).filter(i -> !excluded.contains(i)).toArray();
}

private static Set<Integer> collectPartitionKeyIndices(final TableSemantics tableSemantics) {
return new HashSet<>(
Arrays.stream(tableSemantics.partitionByColumns())
.boxed()
.collect(Collectors.toSet()));
}

private ChangelogTypeStrategyUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.types.ColumnList;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -96,10 +97,12 @@ public List<Signature> getExpectedSignatures(final FunctionDefinition definition

final String opColumnName = resolveOpColumnName(callContext);
final List<Field> inputFields = DataType.getFields(semantics.dataType());
final int[] outputIndices =
ChangelogTypeStrategyUtils.computeOutputIndices(semantics);

final List<Field> outputFields = new ArrayList<>();
outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING()));
outputFields.addAll(inputFields);
Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add);

return Optional.of(DataTypes.ROW(outputFields).notNull());
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ protected void applyDefaultEnvironmentOptions(TableConfig config) {
@Override
public List<TableTestProgram> programs() {
return List.of(
ToChangelogTestPrograms.INSERT_ONLY_INPUT,
ToChangelogTestPrograms.UPDATING_INPUT,
ToChangelogTestPrograms.UPSERT_INPUT,
ToChangelogTestPrograms.INSERT,
ToChangelogTestPrograms.RETRACT,
ToChangelogTestPrograms.UPSERT,
ToChangelogTestPrograms.RETRACT_PARTITION_BY,
ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
ToChangelogTestPrograms.CUSTOM_OP_NAME,
ToChangelogTestPrograms.TABLE_API_DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ToChangelogTestPrograms {
// SQL tests
// --------------------------------------------------------------------------------------------

public static final TableTestProgram INSERT_ONLY_INPUT =
public static final TableTestProgram INSERT =
TableTestProgram.of("to-changelog-insert-only", "insert-only input produces op=INSERT")
.setupTableSource(
SourceTestStep.newBuilder("t")
Expand All @@ -60,7 +60,7 @@ public class ToChangelogTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)")
.build();

public static final TableTestProgram UPDATING_INPUT =
public static final TableTestProgram RETRACT =
TableTestProgram.of(
"to-changelog-updating-input",
"retract input produces all op codes including UPDATE_BEFORE")
Expand Down Expand Up @@ -121,7 +121,38 @@ public class ToChangelogTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)")
.build();

public static final TableTestProgram UPSERT_INPUT =
/** Partitions by a non-leading column ({@code id}, the middle column of three). */
public static final TableTestProgram RETRACT_PARTITION_BY =
TableTestProgram.of(
"to-changelog-retract-partition-by-middle-column",
"PARTITION BY a non-leading column drops it from the function output "
+ "without disturbing the order of the remaining columns")
.setupTableSource(
SourceTestStep.newBuilder("t")
.addSchema(
"name STRING PRIMARY KEY NOT ENFORCED",
"id STRING",
"score BIGINT")
.addMode(ChangelogMode.all())
.producedValues(
Row.ofKind(RowKind.INSERT, "Alice", "EU", 10L),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", "EU", 10L),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", "EU", 30L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema(
"id STRING", "op STRING", "name STRING", "score BIGINT")
.consumedValues(
"+I[EU, INSERT, Alice, 10]",
"+I[EU, UPDATE_BEFORE, Alice, 10]",
"+I[EU, UPDATE_AFTER, Alice, 30]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t PARTITION BY id)")
.build();

public static final TableTestProgram UPSERT =
TableTestProgram.of(
"to-changelog-upsert-input",
"upsert input gets ChangelogNormalize for UPDATE_BEFORE and full deletes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void testUpsertSource() {
}

@Test
void testInsertOnlySource() {
void testInsertSource() {
util.tableEnv()
.executeSql(
"CREATE TABLE insert_only_source ("
Expand All @@ -89,7 +89,7 @@ void testInsertOnlySource() {
}

@Test
void testSetSemanticsWithPartitionBy() {
void testRetractPartitionBy() {
util.tableEnv()
.executeSql(
"CREATE TABLE retract_source ("
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
<TestCase name="testInsertOnlySource">
<TestCase name="testInsertSource">
<Resource name="sql">
<![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source)]]>
</Resource>
Expand All @@ -35,42 +35,42 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), D
]]>
</Resource>
</TestCase>
<TestCase name="testRetractSource">
<TestCase name="testRetractPartitionBy">
<Resource name="sql">
<![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)]]>
<![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(op=[$0], id=[$1], name=[$2])
+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)])
LogicalProject(id=[$0], op=[$1], name=[$2])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to point out:

  • If we use PARTITION BY id the LogicalProject would be id, op, ...
  • If we avoid PARTITION BY clause the LogicalProject would be op, id,...

is this intended?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that checks out for me

+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)])
+- LogicalProject(id=[$0], name=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, retract_source]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database, retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)], changelogMode=[I])
+- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
]]>
</Resource>
</TestCase>
<TestCase name="testSetSemanticsWithPartitionBy">
<TestCase name="testRetractSource">
<Resource name="sql">
<![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)]]>
<![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(id=[$0], op=[$1], id0=[$2], name=[$3])
+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, INTEGER id0, VARCHAR(2147483647) name)])
LogicalProject(op=[$0], id=[$1], name=[$2])
+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)])
+- LogicalProject(id=[$0], name=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, retract_source]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], select=[id,op,id0,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, INTEGER id0, VARCHAR(2147483647) name)], changelogMode=[I])
+- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I])
+- TableSourceScan(table=[[default_catalog, default_database, retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.data.utils.ProjectedRowData;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils;
import org.apache.flink.types.ColumnList;
import org.apache.flink.types.RowKind;

Expand Down Expand Up @@ -57,19 +60,24 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> {
RowKind.DELETE, "DELETE");

private final Map<RowKind, String> rawOpMap;
private final int[] outputIndices;

private transient Map<RowKind, StringData> opMap;
private transient GenericRowData opRow;
private transient JoinedRowData output;
private transient ProjectedRowData projectedOutput;

@SuppressWarnings("unchecked")
public ToChangelogFunction(final SpecializedContext context) {
super(BuiltInFunctionDefinitions.TO_CHANGELOG, context);
final CallContext callContext = context.getCallContext();
// Table argument is guaranteed by the type strategy's validation phase.
final TableSemantics tableSemantics = callContext.getTableSemantics(0).get();

final Map<String, String> opMapping =
callContext.getArgumentValue(2, Map.class).orElse(null);
this.rawOpMap = buildOpMap(opMapping);
this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics);
}

@Override
Expand All @@ -79,6 +87,7 @@ public void open(final FunctionContext context) throws Exception {
rawOpMap.forEach((kind, code) -> opMap.put(kind, StringData.fromString(code)));
opRow = new GenericRowData(1);
output = new JoinedRowData();
projectedOutput = ProjectedRowData.from(outputIndices);
}

/**
Expand Down Expand Up @@ -110,6 +119,6 @@ public void eval(
}

opRow.setField(0, opCode);
collect(output.replace(opRow, input));
collect(output.replace(opRow, projectedOutput.replaceRow(input)));
}
}