/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ kafka_sync_table \
+ --warehouse hdfs:///path/to/warehouse \
+ --database test_db \
+ --table test_table \
+ --partition_keys pt \
+ --computed_column 'pt=date_format(event_tm, yyyyMMdd)' \
+ --kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \
+ --kafka_conf topic=test_log \
+ --kafka_conf properties.group.id=123456 \
+ --kafka_conf value.format=json \
+ --catalog_conf metastore=hive \
+ --catalog_conf uri=thrift://hive-metastore:9083 \
+ --table_conf sink.parallelism=4
+```
+
## Synchronizing Databases
By using [PulsarSyncDatabaseAction](/docs/{{< param Branch >}}/api/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncDatabaseAction) in a Flink DataStream job or directly through `flink run`, users can synchronize the multi topic or one topic into one Paimon database.
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
index 28dc3e457c09..e7e588e4eb4a 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
@@ -22,6 +22,7 @@
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.canal.CanalRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser;
+import org.apache.paimon.flink.action.cdc.format.json.JsonRecordParser;
import org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellRecordParser;
import org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser;
@@ -38,7 +39,8 @@ public enum DataFormat {
CANAL_JSON(CanalRecordParser::new),
OGG_JSON(OggRecordParser::new),
MAXWELL_JSON(MaxwellRecordParser::new),
- DEBEZIUM_JSON(DebeziumRecordParser::new);
+ DEBEZIUM_JSON(DebeziumRecordParser::new),
+ JSON(JsonRecordParser::new);
// Add more data formats here if needed
private final RecordParserFactory parser;
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java
new file mode 100644
index 000000000000..e7f30c0b0e2a
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.json;
+
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.format.RecordParser;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.RowKind;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The {@code JsonRecordParser} class extends the abstract {@link RecordParser} class and is
+ * designed to parse JSON records.
+ *
+ * This class treats JSON records as special CDC data with only insert operation type and
+ * generates {@link RichCdcMultiplexRecord} objects with only INSERT operation types.
+ */
+public class JsonRecordParser extends RecordParser {
+
+ public JsonRecordParser(
+ boolean caseSensitive, TypeMapping typeMapping, List computedColumns) {
+ super(caseSensitive, typeMapping, computedColumns);
+ }
+
+ @Override
+ protected List extractRecords() {
+ List records = new ArrayList<>();
+ processRecord(root, RowKind.INSERT, records);
+ return records;
+ }
+
+ @Override
+ protected String primaryField() {
+ return null;
+ }
+
+ @Override
+ protected String dataField() {
+ return null;
+ }
+
+ @Override
+ protected String format() {
+ return "json";
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaJsonSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaJsonSyncTableActionITCase.java
new file mode 100644
index 000000000000..c8601bb74954
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaJsonSyncTableActionITCase.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.kafka;
+
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+
+/** IT cases for {@link KafkaSyncTableAction}. */
+public class KafkaJsonSyncTableActionITCase extends KafkaSyncTableActionITCase {
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolution() throws Exception {
+ String topic = "schema-evolution";
+ Map tableOptions = new HashMap<>();
+
+ createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic, "kafka/json/table/schemaevolution/json-data-1.txt");
+
+ Map kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withTableConfig(tableOptions)
+ .build();
+
+ runActionWithDefaultEnv(action);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()},
+ new String[] {"a", "b", "event_tm"});
+
+ waitForResult(
+ Arrays.asList("+I[a1, b1, 2024-05-22 09:50:40]", "+I[a2, b2, 2024-05-23 10:20:56]"),
+ getFileStoreTable(tableName),
+ rowType,
+ Collections.emptyList());
+
+ writeRecordsToKafka(
+ topic, "kafka/json/table/schemaevolution/json-data-2.txt", "schemaevolution");
+
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"a", "b", "event_tm", "c"});
+
+ waitForResult(
+ Arrays.asList(
+ "+I[a1, b1, 2024-05-22 09:50:40, NULL]",
+ "+I[a2, b2, 2024-05-23 10:20:56, NULL]",
+ "+I[a3, b3, 2024-05-22 19:50:40, NULL]",
+ "+I[a4, b4, 2024-05-23 15:20:56, c4]"),
+ getFileStoreTable(tableName),
+ rowType,
+ Collections.emptyList());
+ }
+
+ @Test
+ @Timeout(60)
+ public void testComputedColumn() throws Exception {
+ String topic = "computed_column";
+ Map tableOptions = new HashMap<>();
+
+ createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic, "kafka/json/table/computedcolumn/json-data-1.txt");
+
+ Map kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "json");
+ kafkaConfig.put(TOPIC.key(), topic);
+ KafkaSyncTableAction action =
+ syncTableActionBuilder(kafkaConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withPartitionKeys("pt")
+ .withTableConfig(tableOptions)
+ .withComputedColumnArgs("pt=date_format(event_tm, yyyyMMdd)")
+ .build();
+
+ runActionWithDefaultEnv(action);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {"a", "b", "event_tm", "pt"});
+ waitForResult(
+ Arrays.asList(
+ "+I[a1, b1, 2024-05-20 20:50:30, 20240520]",
+ "+I[a2, b2, 2024-05-21 18:10:46, 20240521]"),
+ getFileStoreTable(tableName),
+ rowType,
+ Collections.emptyList());
+ }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/json/table/computedcolumn/json-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/json/table/computedcolumn/json-data-1.txt
new file mode 100644
index 000000000000..e62861738e41
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/json/table/computedcolumn/json-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"a":"a1","b":"b1","event_tm":"2024-05-20 20:50:30"}
+{"a":"a2","b":"b2","event_tm":"2024-05-21 18:10:46"}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/json/table/schemaevolution/json-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/json/table/schemaevolution/json-data-1.txt
new file mode 100644
index 000000000000..4f6aa005a212
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/json/table/schemaevolution/json-data-1.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"a":"a1","b":"b1","event_tm":"2024-05-22 09:50:40"}
+{"a":"a2","b":"b2","event_tm":"2024-05-23 10:20:56"}
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/json/table/schemaevolution/json-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/json/table/schemaevolution/json-data-2.txt
new file mode 100644
index 000000000000..5f9c1dff7208
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/json/table/schemaevolution/json-data-2.txt
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"a":"a3","b":"b3","event_tm":"2024-05-22 19:50:40"}
+{"a":"a4","b":"b4","c":"c4","event_tm":"2024-05-23 15:20:56"}
\ No newline at end of file