Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Note: This version requires that your CDC data encodes updates using a full imag

```sql
SELECT * FROM FROM_CHANGELOG(
input => TABLE source_table,
input => TABLE source_table [PARTITION BY key_col],
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP[
'c, r', 'INSERT',
Expand All @@ -61,7 +61,7 @@ SELECT * FROM FROM_CHANGELOG(

| Parameter | Required | Description |
|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `input` | Yes | The input table. Must be append-only. |
| `input` | Yes | The input table. Must be append-only. Use `PARTITION BY` to ensure rows for the same key are processed together. |
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). Each change operation may appear at most once across all entries. |
| `error_handling` | No | Controls behavior when an input row's operation code is `NULL` or not present in the `op_mapping`. Valid values: `FAIL` (default) — throw a `TableRuntimeException`, `SKIP` — silently drop the row. |
Expand Down Expand Up @@ -127,6 +127,28 @@ SELECT * FROM FROM_CHANGELOG(
-- The operation column named 'operation' is used instead of 'op'
```

#### Partitioning by a key

Comment thread
raminqaf marked this conversation as resolved.
```sql
-- Input table 'cdc_stream' with columns (name, id, op, doc)
-- Default output schema: [name, id, doc]
-- Output schema with PARTITION BY: [id, name, doc]

SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream PARTITION BY id
)
```

Comment thread
raminqaf marked this conversation as resolved.
When `PARTITION BY` is provided, **the output schema changes**. The partition key columns are moved to the front by the engine, and the function emits the remaining input columns (excluding the op column). The order becomes:

```
[partition_keys, non_partition_input_columns_excluding_op]
```

Prefer row semantics, when possible. `PARTITION BY` is only necessary when downstream operators are keyed on that column and you want to co-locate rows for the same key in the same parallel operator instance.

If you are producing an upsert table — that is, you are emitting `UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the partition key you select here will be considered both the primary key and the upsert key by the engine. Make sure the `PARTITION BY` key matches your primary key exactly.

#### Invalid operation code handling

Two `error_handling` modes are supported. The job can either fail upon an invalid or unknown op code, or skip the row and continue processing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,17 @@ default TableResult executeInsert(
* TableRuntimeException} when an input row's op code is {@code NULL} or not present in the
* mapping; pass {@code error_handling => 'SKIP'} to silently drop those rows instead.
*
* <p>By default, the input is processed with row semantics (each row independently). To
* co-locate rows with the same key in the same parallel operator instance, partition the input
* first via {@link #partitionBy(Expression...)} and invoke the function via {@link
* PartitionedTable#process(String, Object...)}:
*
* <pre>{@code
* Table result = cdcStream
* .partitionBy($("id"))
* .process("FROM_CHANGELOG");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not so nice. Shall we add a method to PartitionedTable

Suggested change
* .process("FROM_CHANGELOG");
* .fromChangelog();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also do this as a follow up and include to_changelog there.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a followup ticket and will open a PR after this one is merged: https://issues.apache.org/jira/browse/FLINK-39632

* }</pre>
*
* <p>Optional arguments can be passed using named expressions:
*
* <pre>{@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,12 +839,15 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.kind(PROCESS_TABLE)
.staticArguments(
StaticArgument.table(
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
StaticArgumentTrait.ROW_SEMANTIC_TABLE)),
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
StaticArgumentTrait.ROW_SEMANTIC_TABLE))
.withConditionalTrait(
StaticArgumentTrait.SET_SEMANTIC_TABLE,
TraitCondition.hasPartitionBy()),
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,46 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.types.ColumnList;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP;

/** Shared helpers for changelog-style PTFs ({@code TO_CHANGELOG}, {@code FROM_CHANGELOG}). */
@Internal
public final class ChangelogTypeStrategyUtils {
Comment thread
raminqaf marked this conversation as resolved.
private static final String DEFAULT_OP_COLUMN_NAME = "op";

/**
* Resolves the op column name from the {@code op} descriptor argument, falling back to {@link
* #DEFAULT_OP_COLUMN_NAME} when the argument is omitted or empty.
*/
public static String resolveOpColumnName(final CallContext callContext) {
return callContext
.getArgumentValue(ARG_OP, ColumnList.class)
.filter(cl -> !cl.getNames().isEmpty())
.map(cl -> cl.getNames().get(0))
.orElse(DEFAULT_OP_COLUMN_NAME);
}

/**
* Returns the index of the column matching {@code opColumnName} within the input schema, or
* empty if no field matches.
*/
public static OptionalInt resolveOpColumnIndex(
final TableSemantics tableSemantics, final String opColumnName) {
final List<String> fieldNames = DataType.getFieldNames(tableSemantics.dataType());
return IntStream.range(0, fieldNames.size())
.filter(i -> fieldNames.get(i).equals(opColumnName))
.findFirst();
}

/**
* Returns the input column indices that pass through to the function's output, excluding the
Expand Down Expand Up @@ -61,10 +91,9 @@ private static int[] computeOutputIndices(
}

private static Set<Integer> collectPartitionKeyIndices(final TableSemantics tableSemantics) {
return new HashSet<>(
Arrays.stream(tableSemantics.partitionByColumns())
.boxed()
.collect(Collectors.toSet()));
return Arrays.stream(tableSemantics.partitionByColumns())
.boxed()
.collect(Collectors.toSet());
}

private ChangelogTypeStrategyUtils() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.types.ColumnList;
import org.apache.flink.types.RowKind;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -50,8 +53,6 @@ public final class FromChangelogTypeStrategy {
public static final int ARG_OP_MAPPING = 2;
public static final int ARG_ERROR_HANDLING = 3;

public static final String DEFAULT_OP_COLUMN_NAME = "op";

private static final Set<String> VALID_ROW_KIND_NAMES =
Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");

Expand Down Expand Up @@ -86,7 +87,8 @@ public Optional<List<DataType>> inferInputTypes(
new ValidationException(
"First argument must be a table for FROM_CHANGELOG."));

final String opColumnName = resolveOpColumnName(callContext);
final String opColumnName =
ChangelogTypeStrategyUtils.resolveOpColumnName(callContext);

final List<Field> outputFields = buildOutputFields(tableSemantics, opColumnName);

Expand All @@ -97,7 +99,6 @@ public Optional<List<DataType>> inferInputTypes(
// Helpers
// --------------------------------------------------------------------------------------------

@SuppressWarnings("rawtypes")
private static Optional<List<DataType>> validateInputs(
final CallContext callContext, final boolean throwOnFailure) {
Optional<List<DataType>> error;
Expand Down Expand Up @@ -156,23 +157,26 @@ private static Optional<List<DataType>> validateOpColumn(
final CallContext callContext, final boolean throwOnFailure) {

final TableSemantics tableSemantics = callContext.getTableSemantics(ARG_TABLE).get();
final String opColumnName = resolveOpColumnName(callContext);
final List<Field> inputFields = DataType.getFields(tableSemantics.dataType());
final Optional<Field> opField =
inputFields.stream().filter(f -> f.getName().equals(opColumnName)).findFirst();
if (opField.isEmpty()) {
final String opColumnName = ChangelogTypeStrategyUtils.resolveOpColumnName(callContext);
final OptionalInt opIndex =
ChangelogTypeStrategyUtils.resolveOpColumnIndex(tableSemantics, opColumnName);
if (opIndex.isEmpty()) {
return callContext.fail(
throwOnFailure,
String.format(
"The op column '%s' does not exist in the input schema.",
opColumnName));
}
if (!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING)) {
final LogicalType opFieldType =
DataType.getFieldDataTypes(tableSemantics.dataType())
.get(opIndex.getAsInt())
.getLogicalType();
if (!opFieldType.is(LogicalTypeFamily.CHARACTER_STRING)) {
return callContext.fail(
throwOnFailure,
String.format(
"The op column '%s' must be of STRING type, but was '%s'.",
opColumnName, opField.get().getDataType().getLogicalType()));
opColumnName, opFieldType));
}
return Optional.empty();
}
Expand Down Expand Up @@ -273,21 +277,13 @@ private static Optional<List<DataType>> validateErrorHandling(
return Optional.empty();
}

private static String resolveOpColumnName(final CallContext callContext) {
return callContext
.getArgumentValue(ARG_OP, ColumnList.class)
.filter(cl -> !cl.getNames().isEmpty())
.map(cl -> cl.getNames().get(0))
.orElse(DEFAULT_OP_COLUMN_NAME);
}

private static List<Field> buildOutputFields(
final TableSemantics tableSemantics, final String opColumnName) {
final List<Field> inputFields = DataType.getFields(tableSemantics.dataType());

// Exclude the op column (becomes RowKind), keep all other columns
return inputFields.stream()
.filter(f -> !f.getName().equals(opColumnName))
return Arrays.stream(
ChangelogTypeStrategyUtils.computeOutputIndices(
tableSemantics, opColumnName))
.mapToObj(inputFields::get)
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ protected void applyDefaultEnvironmentOptions(TableConfig config) {
@Override
public List<TableTestProgram> programs() {
return List.of(
FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
FromChangelogTestPrograms.RETRACT,
FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
FromChangelogTestPrograms.CUSTOM_OP_NAME,
FromChangelogTestPrograms.RETRACT_PARTITION_BY,
FromChangelogTestPrograms.DELETION_FLAG_PARTITION_BY,
FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING,
FromChangelogTestPrograms.SKIP_NULL_OP_CODE,
FromChangelogTestPrograms.TABLE_API_DEFAULT,
Expand Down
Loading