Skip to content

Flink : Writing iceberg data through Flink is inaccurate #5614

@xuzhiwen1255

Description

@xuzhiwen1255

Apache Iceberg version

0.13.1

Query engine

Flink

Please describe the bug 🐞

versions
flink : 1.14.5
iceberg : 0.13.1 and master

Problem description

I wrote 500 million data by Flink datagen collector to Iceberg. When I selecte count(*) from t, I found that the actual data exceeded 500 million

Operation steps

-- start  flink-yarn-session
bin/yarn-session.sh  -D pipeline.operator-chaining=false  -D taskmanager.memory.jvm-overhead.min=400MB -D taskmanager.memory.jvm-overhead.max=1000MB  -D env.java.opts="-XX:+UseG1GC"  -s 1 -jm 4096 -tm 4096 -nm flink-sql-session -d
-- start flink-sql-client    use icebeg version 0.14 and 0.13.1
./sql-client.sh embedded -j ../icebergjars/-iceberg-flink-runtime-1.14-0.13.1.jar -s yarn-session
drop table dg;
CREATE TABLE dg (
    id INT,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR,c4 VARCHAR,c5 VARCHAR,c6 VARCHAR,c7 VARCHAR,c8 VARCHAR,c9 VARCHAR,c10 VARCHAR,c11 VARCHAR,c12 VARCHAR,c13 VARCHAR,c14 VARCHAR,c15 VARCHAR,c16 VARCHAR,c17 VARCHAR,c18 VARCHAR,c19 VARCHAR,c20 VARCHAR,p int
)
WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100000000',
    'number-of-rows' = '500000000',
    'fields.id.min'='1',
    'fields.id.max'='147483647',
    'fields.c1.length' = '20','fields.c2.length' = '20','fields.c3.length' = '20','fields.c4.length' = '20','fields.c5.length' = '20','fields.c6.length' = '20','fields.c7.length' = '20','fields.c8.length' = '20','fields.c9.length' = '20','fields.c10.length' = '20','fields.c11.length' = '20','fields.c12.length' = '20','fields.c13.length' = '20','fields.c14.length' = '20','fields.c15.length' = '20','fields.c16.length' = '20','fields.c17.length' = '20','fields.c18.length' = '20','fields.c19.length' = '20','fields.c20.length' = '20',
   'fields.id.min'='1','fields.id.max'='1000000'
  ); 
 CREATE CATALOG hc WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://xxx:8020/user/iceberg/warehouse',
  'property-version'='1'
);

create table hc.db.t1(
 id INT,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR,c4 VARCHAR,c5 VARCHAR,c6 VARCHAR,c7 VARCHAR,c8 VARCHAR,c9 VARCHAR,c10 VARCHAR,c11 VARCHAR,c12 VARCHAR,c13 VARCHAR,c14 VARCHAR,c15 VARCHAR,c16 VARCHAR,c17 VARCHAR,c18 VARCHAR,c19 VARCHAR,c20 VARCHAR,p int
) ;

set parallelism.default=30;
set execution.checkpointing.interval=30sec;
insert into hc.db.t1 select * from dg;

Problems that arise

image

image

Through my test, I found that after the task runs for a period of time, in special cases, icebergfiles committee will receive two identical files after the checkpoint is triggered Finally, when the metadata is written, the same file is referenced twice in the manifest list. The query result will exceed the expected 500 million data

I added a custom operator to add some log output. When duplicate files appear in the same checkpoint, I will throw an exception

Log as follows

2022-08-22 20:38:24,420 INFO  org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Start to flush snapshot state to state backend, table: hc.db.t1, checkpointId: 27
2022-08-22 20:38:24,686 INFO  org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Committing append with 30 data files and 0 delete files to table hc.db.t1
2022-08-22 20:38:25,664 INFO  org.apache.iceberg.hadoop.HadoopTableOperations              [] - Committed a new metadata file hdfs://?:8020/user/iceberg/warehouse/db/t1/metadata/v28.metadata.json
2022-08-22 20:38:25,678 INFO  org.apache.iceberg.SnapshotProducer                          [] - Committed snapshot 6794546868781528751 (MergeAppend)
2022-08-22 20:38:25,692 INFO  org.apache.iceberg.flink.sink.IcebergFilesCommitter          [] - Committed in 1006 ms
2022-08-22 20:38:36,464 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 0,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00004-0-e25f4e48-66e5-4351-bdc8-f65f06beb1ef-00028.parquet --- 
2022-08-22 20:38:51,924 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 0,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00028-0-90a05360-fdd9-4030-a299-ab209b9d8640-00028.parquet --- 
2022-08-22 20:38:52,197 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 0,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00003-0-2dc897ac-7bae-4a99-81f5-7965b81b8f41-00028.parquet --- 
2022-08-22 20:38:52,445 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 4,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00004-0-e25f4e48-66e5-4351-bdc8-f65f06beb1ef-00028.parquet --- 
2022-08-22 20:38:52,446 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 3,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00003-0-2dc897ac-7bae-4a99-81f5-7965b81b8f41-00028.parquet --- 
2022-08-22 20:38:52,447 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 28,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00028-0-90a05360-fdd9-4030-a299-ab209b9d8640-00028.parquet --- 
2022-08-22 20:38:52,515 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 5,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00005-0-04d90606-4e68-440e-9ce5-58f1e0f2520f-00028.parquet --- 
2022-08-22 20:38:52,516 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 27,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00027-0-e1e4968b-754c-4aad-9bbd-fae9152124d7-00028.parquet --- 
2022-08-22 20:38:52,517 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 25,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00025-0-a2f49612-f038-42c4-83ed-ea1781bffa52-00028.parquet --- 
2022-08-22 20:38:52,523 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 13,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00013-0-41ad64c5-4504-4c8b-bed7-02fd035cfda9-00028.parquet --- 
2022-08-22 20:38:52,700 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 21,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00021-0-5617be9f-5b54-4d7f-bff0-c5c84dd211b3-00028.parquet --- 
2022-08-22 20:38:52,778 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 16,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00016-0-f6b85e68-476a-4091-aba9-b359187ad128-00028.parquet --- 
2022-08-22 20:38:52,800 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 7,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00007-0-bbb9ff21-7425-4eab-9506-3b37953c3143-00028.parquet --- 
2022-08-22 20:38:52,855 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 17,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00017-0-aedbb2ed-0e6f-4c41-bd9c-b79e94a77f5c-00028.parquet --- 
2022-08-22 20:38:52,903 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 0,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00000-0-0c5a8c91-66b5-4c9a-8b6d-9bc0752aae98-00028.parquet --- 
2022-08-22 20:38:52,923 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 23,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00023-0-a987c77b-580f-462e-a824-9111b2f265e0-00028.parquet --- 
2022-08-22 20:38:52,926 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 2,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00002-0-d90d3062-60e1-4886-a9c5-337867ab36f9-00028.parquet --- 
2022-08-22 20:38:52,936 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 26,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00026-0-d4c638a8-0e1d-4592-afad-c88f438ce4e3-00028.parquet --- 
2022-08-22 20:38:52,955 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 18,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00018-0-1120866c-36ee-47d4-8584-ac6e594e7c8d-00028.parquet --- 
2022-08-22 20:38:52,983 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 29,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00029-0-aca3dc1a-20e3-4833-a896-eb74b4fedd2c-00028.parquet --- 
2022-08-22 20:38:53,090 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 22,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00022-0-23ac6b11-0b88-4c62-85e8-901691dc5d14-00028.parquet --- 
2022-08-22 20:38:53,141 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 11,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00011-0-b0966207-bdd0-4796-9114-f843d494f8f7-00028.parquet --- 
2022-08-22 20:38:53,161 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 15,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00015-0-abf724a5-3810-4b85-af27-2dfea9c168a9-00028.parquet --- 
2022-08-22 20:38:53,165 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 1,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00001-0-78b2ab81-8ea8-461d-a018-7fd64d714920-00028.parquet --- 
2022-08-22 20:38:53,180 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 12,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00012-0-9cae32e4-97cd-4fa9-a130-2b9e5baea091-00028.parquet --- 
2022-08-22 20:38:53,200 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 6,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00006-0-546e20d4-2583-4d67-89ef-bdca64d3ec08-00028.parquet --- 
2022-08-22 20:38:53,323 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 14,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00014-0-798c97ee-e504-4e1d-8529-f630c56683e7-00028.parquet --- 
2022-08-22 20:38:53,329 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 20,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00020-0-3dac110a-6303-472a-96fd-a07bd8dca9ae-00028.parquet --- 
2022-08-22 20:38:53,367 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 19,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00019-0-10e161ac-4b99-4061-85a5-01c6f7fa01b9-00028.parquet --- 
2022-08-22 20:38:53,414 WARN  org.apache.iceberg.flink.sink.MapTest                         [] -  accepted subtask 24,add path:hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00024-0-083dfaff-2144-4d17-bbb5-325d29ea1350-00028.parquet --- 
2022-08-22 20:38:53,423 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - xzw-consutm (1/1)#0 (04aac6f60f4c5e1b3eb49dc09374029d) switched from RUNNING to FAILED with failure cause: 
java.lang.RuntimeException: Same path is  =hdfs://?:8020/user/iceberg/warehouse/db/t1/data/00028-0-90a05360-fdd9-4030-a299-ab209b9d8640-00028.parquet last send subtask = 0 ,cur send subtask = 28	
 

@rdblue Please have a look

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions