Search before asking
Paimon version
paimon version is 1.2
Compute Engine
compute engine is Flink 1.20.0
Minimal reproduce step
-- This is source main table
DROP TABLE ods_test.main_table;
CREATE TABLE ods_test.main_table (
id INT,
name STRING,
value INT,
update_time TIMESTAMP(3),
-- WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,
primary key (id) not enforced
) WITH (
'connector' = 'paimon'
);
-- This is source slave table
-- secondary table
DROP TABLE ods_test.secondary_table;
CREATE TABLE ods_test.secondary_table (
sid INT,
main_id INT,
info STRING,
score INT,
update_time TIMESTAMP(3),
-- WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,
primary key (sid) not enforced
) WITH (
'connector' = 'paimon'
);
-- This is target table
DROP TABLE dwd_test.target_table;
CREATE TABLE dwd_test.target_table (
id INT PRIMARY KEY NOT ENFORCED,
main_name STRING,
main_value INT,
sec_info STRING,
sec_score INT,
main_seq TIMESTAMP(3),
sec_seq TIMESTAMP(3)
-- update_time TIMESTAMP(3)
) WITH (
'connector' = 'paimon',
'merge-engine' = 'partial-update',
'partial-update.remove-record-on-delete' = 'true',
-- 'partial-update.sequence-group' = 'main_name,main_value:main_seq;sec_info,sec_score:sec_seq',
-- 'partial-update.sequence-group.sec_group.ignore-retract' = 'true', -- 关键配置
-- 关键配置:允许NULL覆盖已有值
'partial-update.ignore-null-field' = 'false',
'changelog-producer' = 'lookup',
'changelog-producer.lookup.ignore-delete' = 'false'
);
-- This is the insert statement to populate the target table
INSERT INTO dwd_test.target_table
SELECT
COALESCE(m.id, s.main_id) AS id,
m.name AS main_name,
m.value AS main_value,
-- 显式处理从表字段
s.info AS sec_info, -- 直接使用,当s.info为NULL时表示删除
s.score AS sec_score,
m.update_time AS main_seq,
-- 使用特殊序列号标记删除
CASE
WHEN s.sid IS NULL THEN CURRENT_TIMESTAMP -- 删除标记
ELSE s.update_time
END AS sec_seq
-- m.update_time as m_update_time,
-- s.update_time as s_update_time
FROM ods_test.main_table m
LEFT JOIN ods_test.secondary_table s ON m.id = s.main_id;
-- Insert initial data for main table
INSERT INTO ods_test.main_table VALUES
(1, 'Main1', 100, TO_TIMESTAMP('2023-01-01 10:00:00')),
(2, 'Main2', 200, TO_TIMESTAMP('2023-01-01 10:00:00'));
-- -- Insert initial data for slave table
INSERT INTO ods_test.secondary_table VALUES
(101, 1, 'Info1', 5, TO_TIMESTAMP('2023-01-01 10:01:00')),
(102, 2, 'Info2', 8, TO_TIMESTAMP('2023-01-01 10:01:00'));
-- Delete a record in slave table
DELETE FROM ods_test.secondary_table WHERE sid = 101;
What doesn't meet your expectations?
I want to test that when I delete a record in slave table, I hope the target table set the fields from slave as null.
But result is not , the target table does not change anything, even though I have configured the 'ignore-null-field'.
Below is the process I check the result:
Batch mode query target table:
select * from dwd_test.target_table;
select * from dwd_test.target_table;
+----+-----------+------------+-------------+-----------+-------------+-------------------------+-------------------------+
| id | main_name | main_value | sec_info | sec_score | update_time | main_seq | sec_seq |
+----+-----------+------------+-------------+-----------+-------------+-------------------------+-------------------------+
| 1 | Main1 | 100 | Info1 | 5 | | 2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
| 2 | Main2 | 200 | Info2 | 8 | | 2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
+----+-----------+------------+-------------+-----------+-------------+-------------------------+-------------------------+
Streaming mode query target table:
select * from dwd_test.target_table;
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
| op | id | main_name | main_value | sec_info | sec_score | main_seq | sec_seq |
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
| +I | 1 | Main1 | 100 | Info1 | 5 | 2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
| +I | 2 | Main2 | 200 | Info2 | 8 | 2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
| -U | 1 | Main1 | 100 | Info1 | 5 | 2023-01-01 10:03:00.000 | 2023-01-01 10:02:00.000 |
| +U | 1 | Main1 | 100 | Info1 | 5 | 2023-01-01 10:03:00.000 | 2025-07-08 15:03:06.243 |
In flink sql client , use the select logic select to check the data before write to paimon:
SELECT
COALESCE(m.id, s.main_id) AS id,
m.name AS main_name,
m.value AS main_value,
-- 显式处理从表字段
s.info AS sec_info, -- 直接使用,当s.info为NULL时表示删��
s.score AS sec_score,
m.update_time AS main_seq,
-- 使用特殊序列号标记删除
CASE
WHEN s.sid IS NULL THEN CURRENT_TIMESTAMP -- 删除标记
ELSE s.update_time
END AS sec_seq
-- m.update_time as m_update_time,
-- s.update_time as s_update_time
FROM ods_test.main_table m
LEFT JOIN ods_test.secondary_table s ON m.id = s.main_id;
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
| op | id | main_name | main_value | sec_info | sec_score | main_seq | sec_seq |
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
| +I | 2 | Main2 | 200 | Info2 | 8 | 2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
| +I | 1 | Main1 | 100 | | | 2023-01-01 10:03:00.000 | 2025-07-08 15:08:15.451 |
Anything else?
I want to implement the following functions: Support CURD of master-slave table:
- When adding new records to the master table, new records will be added to the target table according to the join logic of sql
- When updating records in the master table, the corresponding field values in the master table in the target table are updated in the target table according to the PK
- When deleting records in the master table, the records with the corresponding PK in the target table are deleted
- When adding new records to the slave table, first associate the complete record according to the sql logic. If the PK cannot be associated in the master table, skip it directly; if the PK can be associated in the master table, update the records in the target table through the PK.
- When updating records from the slave table, the principle is the same as adding
- When deleting records from the slave table, the principle is basically the same as update, the difference is: if the record with PK can be found, all the fields of the slave table in the target table are set to NULL.
How I can config the target table in Paimon.
Are you willing to submit a PR?
Search before asking
Paimon version
paimon version is 1.2
Compute Engine
compute engine is Flink 1.20.0
Minimal reproduce step
-- This is source main table
DROP TABLE ods_test.main_table;
CREATE TABLE ods_test.main_table (
id INT,
name STRING,
valueINT,update_time TIMESTAMP(3),
-- WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,
primary key (id) not enforced
) WITH (
'connector' = 'paimon'
);
-- This is source slave table
-- secondary table
DROP TABLE ods_test.secondary_table;
CREATE TABLE ods_test.secondary_table (
sid INT,
main_id INT,
info STRING,
score INT,
update_time TIMESTAMP(3),
-- WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND,
primary key (sid) not enforced
) WITH (
'connector' = 'paimon'
);
-- This is target table
DROP TABLE dwd_test.target_table;
CREATE TABLE dwd_test.target_table (
id INT PRIMARY KEY NOT ENFORCED,
main_name STRING,
main_value INT,
sec_info STRING,
sec_score INT,
main_seq TIMESTAMP(3),
sec_seq TIMESTAMP(3)
-- update_time TIMESTAMP(3)
) WITH (
'connector' = 'paimon',
'merge-engine' = 'partial-update',
'partial-update.remove-record-on-delete' = 'true',
-- 'partial-update.sequence-group' = 'main_name,main_value:main_seq;sec_info,sec_score:sec_seq',
-- 'partial-update.sequence-group.sec_group.ignore-retract' = 'true', -- 关键配置
-- 关键配置:允许NULL覆盖已有值
'partial-update.ignore-null-field' = 'false',
'changelog-producer' = 'lookup',
'changelog-producer.lookup.ignore-delete' = 'false'
);
-- This is the insert statement to populate the target table
INSERT INTO dwd_test.target_table
SELECT
COALESCE(m.id, s.main_id) AS id,
m.name AS main_name,
m.
valueAS main_value,-- 显式处理从表字段
s.info AS sec_info, -- 直接使用,当s.info为NULL时表示删除
s.score AS sec_score,
m.update_time AS main_seq,
-- 使用特殊序列号标记删除
CASE
WHEN s.sid IS NULL THEN CURRENT_TIMESTAMP -- 删除标记
ELSE s.update_time
END AS sec_seq
-- m.update_time as m_update_time,
-- s.update_time as s_update_time
FROM ods_test.main_table m
LEFT JOIN ods_test.secondary_table s ON m.id = s.main_id;
-- Insert initial data for main table
INSERT INTO ods_test.main_table VALUES
(1, 'Main1', 100, TO_TIMESTAMP('2023-01-01 10:00:00')),
(2, 'Main2', 200, TO_TIMESTAMP('2023-01-01 10:00:00'));
-- -- Insert initial data for slave table
INSERT INTO ods_test.secondary_table VALUES
(101, 1, 'Info1', 5, TO_TIMESTAMP('2023-01-01 10:01:00')),
(102, 2, 'Info2', 8, TO_TIMESTAMP('2023-01-01 10:01:00'));
-- Delete a record in slave table
DELETE FROM ods_test.secondary_table WHERE sid = 101;
What doesn't meet your expectations?
I want to test that when I delete a record in slave table, I hope the target table set the fields from slave as null.
But result is not , the target table does not change anything, even though I have configured the 'ignore-null-field'.
Below is the process I check the result:
Batch mode query target table:
select * from dwd_test.target_table;
select * from dwd_test.target_table;
+----+-----------+------------+-------------+-----------+-------------+-------------------------+-------------------------+
| id | main_name | main_value | sec_info | sec_score | update_time | main_seq | sec_seq |
+----+-----------+------------+-------------+-----------+-------------+-------------------------+-------------------------+
| 1 | Main1 | 100 | Info1 | 5 | | 2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
| 2 | Main2 | 200 | Info2 | 8 | | 2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
+----+-----------+------------+-------------+-----------+-------------+-------------------------+-------------------------+
Streaming mode query target table:
select * from dwd_test.target_table;
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
| op | id | main_name | main_value | sec_info | sec_score | main_seq | sec_seq |
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
| +I | 1 | Main1 | 100 | Info1 | 5 | 2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
| +I | 2 | Main2 | 200 | Info2 | 8 | 2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
| -U | 1 | Main1 | 100 | Info1 | 5 | 2023-01-01 10:03:00.000 | 2023-01-01 10:02:00.000 |
| +U | 1 | Main1 | 100 | Info1 | 5 | 2023-01-01 10:03:00.000 | 2025-07-08 15:03:06.243 |
In flink sql client , use the select logic select to check the data before write to paimon:
SELECT
COALESCE(m.id, s.main_id) AS id,
m.name AS main_name,
m.
valueAS main_value,-- 显式处理从表字段
s.info AS sec_info, -- 直接使用,当s.info为NULL时表示删��
s.score AS sec_score,
m.update_time AS main_seq,
-- 使用特殊序列号标记删除
CASE
WHEN s.sid IS NULL THEN CURRENT_TIMESTAMP -- 删除标记
ELSE s.update_time
END AS sec_seq
-- m.update_time as m_update_time,
-- s.update_time as s_update_time
FROM ods_test.main_table m
LEFT JOIN ods_test.secondary_table s ON m.id = s.main_id;
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
| op | id | main_name | main_value | sec_info | sec_score | main_seq | sec_seq |
+----+-------------+--------------------------------+-------------+--------------------------------+-------------+-------------------------+-------------------------+
| +I | 2 | Main2 | 200 | Info2 | 8 | 2023-01-01 10:00:00.000 | 2023-01-01 10:01:00.000 |
| +I | 1 | Main1 | 100 | | | 2023-01-01 10:03:00.000 | 2025-07-08 15:08:15.451 |
Anything else?
I want to implement the following functions: Support CURD of master-slave table:
How I can config the target table in Paimon.
Are you willing to submit a PR?