Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
15d6581
add kafka metadata support
gmdfalk Sep 29, 2025
ae9a58a
explicit imports
gmdfalk Sep 29, 2025
df113fb
use message metadata for topic
gmdfalk Sep 29, 2025
0740237
fix tests
gmdfalk Sep 29, 2025
42c0195
add itest
gmdfalk Sep 29, 2025
8b523b5
add metadata_column evaluation to other kafka cdc formats
gmdfalk Sep 29, 2025
2701a64
spotless
gmdfalk Sep 29, 2025
2a6715e
dedup
gmdfalk Sep 29, 2025
2d13d4b
Merge branch 'apache:master' into add_kafka_metadata
gmdfalk Dec 17, 2025
19e9577
Merge branch 'apache:master' into add_kafka_metadata
gmdfalk Jan 7, 2026
1500d51
broken
gmdfalk Nov 5, 2025
20e8b07
fix unit tests
gmdfalk Jan 7, 2026
17c3ce2
Merge branch 'apache:master' into add_kafka_metadata
gmdfalk Jan 12, 2026
b030dc3
Merge branch 'apache:master' into add_kafka_metadata
gmdfalk Feb 6, 2026
f1e6b66
add --metadata-column-prefix
gmdfalk Feb 6, 2026
b531939
fix metadatakey lookup
gmdfalk Feb 6, 2026
0a3b1fe
do not skip spotless
gmdfalk Feb 6, 2026
cd90f3a
missing docs
gmdfalk Feb 6, 2026
eba070e
spotless
gmdfalk Feb 6, 2026
26d59ad
docs
gmdfalk Feb 6, 2026
bc77b32
fix unit tests
gmdfalk Feb 6, 2026
ddace59
make sure we set root
gmdfalk Feb 6, 2026
0d13b7b
remove settings
gmdfalk Feb 6, 2026
fd249b5
Merge branch 'master' of github.com:nickdelnano/paimon into nickdelna…
nickdelnano Feb 27, 2026
1cbd850
Fix metadata converters being discarded for non-Avro Kafka formats
nickdelnano Mar 2, 2026
549fb83
Clean up metadata converter plumbing in DataFormat/AbstractRecordParser
nickdelnano Mar 4, 2026
4fdea2d
Add testMetadataColumn integration tests for Canal, Maxwell, Ogg, and…
nickdelnano Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ To use this feature through `flink run`, run the following shell command.
[--primary_keys <primary-keys>] \
[--type_mapping to-string] \
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
[--metadata_column <metadata-column> [--metadata_column ...]] \
[--metadata_column_prefix <metadata-column-prefix>] \
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
Expand Down Expand Up @@ -215,6 +217,8 @@ To use this feature through `flink run`, run the following shell command.
[--partition_keys <partition_keys>] \
[--primary_keys <primary-keys>] \
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
[--metadata_column <metadata-column> [--metadata_column ...]] \
[--metadata_column_prefix <metadata-column-prefix>] \
[--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] \
[--table_conf <paimon-table-sink-conf> [--table_conf <paimon-table-sink-conf> ...]]
Expand Down
8 changes: 8 additions & 0 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@
<td><h5>--computed_column</h5></td>
<td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. NOTICE: It returns null if the referenced column does not exist in the source table.</td>
</tr>
<tr>
<td><h5>--metadata_column</h5></td>
<td>--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data. Available values are topic, partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 'CreateTime' or 'LogAppendTime').</td>
</tr>
<tr>
<td><h5>--metadata_column_prefix</h5></td>
<td>--metadata_column_prefix is optionally used to set a prefix for metadata columns in the Paimon table to avoid conflicts with existing attributes. For example, with prefix "__kafka_", the metadata column "topic" will be stored as "__kafka_topic" field.</td>
</tr>
<tr>
<td><h5>--eager_init</h5></td>
<td>It is default false. If true, all relevant tables commiter will be initialized eagerly, which means those tables could be forced to create snapshot.</td>
Expand Down
10 changes: 9 additions & 1 deletion docs/layouts/shortcodes/generated/kafka_sync_table.html
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@
<td><h5>--computed_column</h5></td>
<td>The definitions of computed columns. The argument field is from Kafka topic's table field name. See <a href="../overview/#computed-functions">here</a> for a complete list of configurations. </td>
</tr>
<tr>
<td><h5>--metadata_column</h5></td>
<td>--metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data. Available values are topic, partition, offset, timestamp and timestamp_type (one of 'NoTimestampType', 'CreateTime' or 'LogAppendTime').</td>
</tr>
<tr>
<td><h5>--metadata_column_prefix</h5></td>
<td>--metadata_column_prefix is optionally used to set a prefix for metadata columns in the Paimon table to avoid conflicts with existing attributes. For example, with prefix "__kafka_", the metadata column "topic" will be stored as "__kafka_topic" field.</td>
</tr>
<tr>
<td><h5>--kafka_conf</h5></td>
<td>The configuration for Flink Kafka sources. Each configuration should be specified in the format `key=value`. `properties.bootstrap.servers`, `topic/topic-pattern`, `properties.group.id`, and `value.format` are required configurations, others are optional.See its <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#connector-options">document</a> for a complete list of configurations.</td>
Expand All @@ -83,4 +91,4 @@
<td>The configuration for Paimon table sink. Each configuration should be specified in the format "key=value". See <a href="{{ $ref }}">here</a> for a complete list of table configurations.</td>
</tr>
</tbody>
</table>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class CdcActionCommonUtils {
public static final String PRIMARY_KEYS = "primary_keys";
public static final String COMPUTED_COLUMN = "computed_column";
public static final String METADATA_COLUMN = "metadata_column";
public static final String METADATA_COLUMN_PREFIX = "metadata_column_prefix";
public static final String MULTIPLE_TABLE_PARTITION_KEYS = "multiple_table_partition_keys";
public static final String EAGER_INIT = "eager_init";
public static final String SYNC_PKEYS_FROM_SOURCE_SCHEMA =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,25 @@
* A functional interface for converting CDC metadata.
*
* <p>This interface provides a mechanism to convert Change Data Capture (CDC) metadata from a given
* {@link JsonNode} source. Implementations of this interface can be used to process and transform
* metadata entries from CDC sources.
* {@link JsonNode} source or {@link CdcSourceRecord}. Implementations of this interface can be used
* to process and transform metadata entries from CDC sources.
*/
public interface CdcMetadataConverter extends Serializable {

String read(JsonNode payload);

/**
* Read metadata from a CDC source record. Default implementation throws
* UnsupportedOperationException to maintain backward compatibility.
*
* @param record the CDC source record
* @return the metadata value as a string
*/
default String read(CdcSourceRecord record) {
throw new UnsupportedOperationException(
"This metadata converter does not support reading from CdcSourceRecord");
}

DataType dataType();

String columnName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.flink.action.cdc.kafka.KafkaMetadataConverter;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;

import java.util.Arrays;
Expand Down Expand Up @@ -49,7 +51,14 @@ public enum CdcMetadataProcessor {
new CdcMetadataConverter.DatabaseNameConverter(),
new CdcMetadataConverter.TableNameConverter(),
new CdcMetadataConverter.SchemaNameConverter(),
new CdcMetadataConverter.OpTsConverter());
new CdcMetadataConverter.OpTsConverter()),
KAFKA_METADATA_PROCESSOR(
SyncJobHandler.SourceType.KAFKA,
new KafkaMetadataConverter.TopicConverter(),
new KafkaMetadataConverter.PartitionConverter(),
new KafkaMetadataConverter.OffsetConverter(),
new KafkaMetadataConverter.TimestampConverter(),
new KafkaMetadataConverter.TimestampTypeConverter());

private final SyncJobHandler.SourceType sourceType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/** A data change record from the CDC source. */
Expand All @@ -35,14 +38,29 @@ public class CdcSourceRecord implements Serializable {
// TODO Use generics to support more scenarios.
private final Object value;

// Generic metadata map - any source can add metadata
private final Map<String, Object> metadata;

public CdcSourceRecord(@Nullable String topic, @Nullable Object key, Object value) {
this.topic = topic;
this.key = key;
this.value = value;
this(topic, key, value, null);
}

public CdcSourceRecord(Object value) {
this(null, null, value);
this(null, null, value, null);
}

public CdcSourceRecord(
@Nullable String topic,
@Nullable Object key,
Object value,
@Nullable Map<String, Object> metadata) {
this.topic = topic;
this.key = key;
this.value = value;
this.metadata =
metadata != null
? Collections.unmodifiableMap(new HashMap<>(metadata))
: Collections.emptyMap();
}

@Nullable
Expand All @@ -59,6 +77,15 @@ public Object getValue() {
return value;
}

public Map<String, Object> getMetadata() {
return metadata;
}

@Nullable
public Object getMetadata(String key) {
return metadata.get(key);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof CdcSourceRecord)) {
Expand All @@ -68,12 +95,13 @@ public boolean equals(Object o) {
CdcSourceRecord that = (CdcSourceRecord) o;
return Objects.equals(topic, that.topic)
&& Objects.equals(key, that.key)
&& Objects.equals(value, that.value);
&& Objects.equals(value, that.value)
&& Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(topic, key, value);
return Objects.hash(topic, key, value, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.types.DataType;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;

/**
* Wraps a {@link CdcMetadataConverter} to add a prefix to its column name.
*
* <p>This decorator allows adding prefixes like "__kafka_" to metadata column names to avoid
* collisions with source data columns, while keeping the underlying converter logic unchanged.
*/
public class PrefixedMetadataConverter implements CdcMetadataConverter {

private static final long serialVersionUID = 1L;

private final CdcMetadataConverter delegate;
private final String prefix;

public PrefixedMetadataConverter(CdcMetadataConverter delegate, String prefix) {
this.delegate = delegate;
this.prefix = prefix != null ? prefix : "";
}

@Override
public String columnName() {
return prefix + delegate.columnName();
}

@Override
public String read(JsonNode payload) {
return delegate.read(payload);
}

@Override
public String read(CdcSourceRecord record) {
return delegate.read(record);
}

@Override
public DataType dataType() {
return delegate.dataType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordPar
return new PostgresRecordParser(
cdcSourceConfig, computedColumns, typeMapping, metadataConverters);
case KAFKA:
return provideDataFormat()
.createParser(typeMapping, computedColumns, metadataConverters);
case PULSAR:
DataFormat dataFormat = provideDataFormat();
return dataFormat.createParser(typeMapping, computedColumns);
return provideDataFormat().createParser(typeMapping, computedColumns);
case MONGODB:
return new MongoDBRecordParser(computedColumns, cdcSourceConfig);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN_PREFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.SYNC_PKEYS_FROM_SOURCE_SCHEMA;
Expand Down Expand Up @@ -64,6 +65,11 @@ protected void withParams(MultipleParameterToolAdapter params, SyncTableActionBa
}

if (params.has(METADATA_COLUMN)) {
// Parse optional prefix first
if (params.has(METADATA_COLUMN_PREFIX)) {
action.withMetadataColumnPrefix(params.get(METADATA_COLUMN_PREFIX));
}

List<String> metadataColumns =
new ArrayList<>(params.getMultiParameter(METADATA_COLUMN));
if (metadataColumns.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public abstract class SynchronizationActionBase extends ActionBase {
// in paimon schema if pkeys are not specified in action command
protected boolean syncPKeysFromSourceSchema = true;
protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[] {};
protected String metadataColumnPrefix = "";

public SynchronizationActionBase(
String database,
Expand Down Expand Up @@ -101,10 +102,21 @@ public SynchronizationActionBase withMetadataColumns(List<String> metadataColumn
this.metadataConverters =
metadataColumns.stream()
.map(this.syncJobHandler::provideMetadataConverter)
.map(
converter ->
metadataColumnPrefix.isEmpty()
? converter
: new PrefixedMetadataConverter(
converter, metadataColumnPrefix))
.toArray(CdcMetadataConverter[]::new);
return this;
}

public SynchronizationActionBase withMetadataColumnPrefix(String prefix) {
this.metadataColumnPrefix = prefix != null ? prefix : "";
return this;
}

public SynchronizationActionBase syncPKeysFromSourceSchema(boolean flag) {
this.syncPKeysFromSourceSchema = flag;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public AbstractJsonRecordParser(TypeMapping typeMapping, List<ComputedColumn> co
}

protected void setRoot(CdcSourceRecord record) {
super.setRoot(record); // Store current record for metadata access
root = (JsonNode) record.getValue();
}

Expand Down Expand Up @@ -104,6 +105,8 @@ protected Map<String, String> extractRowData(JsonNode record, CdcSchema.Builder
return Objects.toString(entry.getValue());
}));
evalComputedColumns(rowData, schemaBuilder);
evalMetadataColumns(rowData, schemaBuilder);

return rowData;
}

Expand Down
Loading