Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]hudi[0.13.1] on flink[1.16.2], after bulk_insert & bucket_index, get int96 exception when flink trigger compaction #9804

Closed
li-ang-666 opened this issue Sep 29, 2023 · 25 comments
Labels
flink Issues related to flink schema-and-data-types

Comments

@li-ang-666
Copy link

li-ang-666 commented Sep 29, 2023

[offline insert]
CREATE TABLE source_table(
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
op_ts as CAST(NOW() AS TIMESTAMP(0)),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/xxxx',
'table-name' = 'xxxx',
'username' = 'xxxx',
'password' = 'xxxx',
'scan.partition.column' = 'id',
'scan.partition.num' = '40000',
'scan.partition.lower-bound' = '1',
'scan.partition.upper-bound' = '400000000',
'scan.fetch-size' = '1024'
);
create table ratio_path_company(
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
op_ts TIMESTAMP(0),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'obs://hadoop-obs/ods_hudi/ratio_path_company004',
'table.type' = 'MERGE_ON_READ',
-- cdc
'changelog.enabled' = 'true',
-- index
'index.type' = 'bucket',
'hoodie.bucket.index.num.buckets' = '256',
-- write
'write.operation' = 'bulk_insert',
'write.bulk_insert.shuffle_input' = 'false',
'write.bulk_insert.sort_input' = 'false',
'write.tasks' = '128',
'write.precombine' = 'true',
'precombine.field' = 'op_ts',
-- compaction
'compaction.schedule.enabled' = 'false',
'compaction.async.enabled' = 'false',
-- clean
'clean.async.enabled' = 'false'
);
insert into ratio_path_company select * from source_table;

[online insert]
CREATE TABLE source_table (
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
op_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'e1d4c.json.prism_shareholder_path.ratio_path_company',
'properties.bootstrap.servers' = 'xxxx:9092',
'properties.group.id' = 'demo-job',
'scan.startup.mode' = 'earliest-offset',
-- canal
'format' = 'canal-json',
'canal-json.ignore-parse-errors' = 'true',
'canal-json.encode.decimal-as-plain-number' = 'true'
);
create table ratio_path_company(
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
op_ts TIMESTAMP(0),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'obs://hadoop-obs/ods_hudi/ratio_path_company004',
'table.type' = 'MERGE_ON_READ',
-- cdc
'changelog.enabled' = 'true',
-- index
'index.type' = 'bucket',
'hoodie.bucket.index.num.buckets' = '256',
-- write
'write.tasks' = '8',
'write.task.max.size' = '512',
'write.batch.size' = '12',
'write.merge.max_memory' = '28',
'write.log_block.size' = '128',
'write.precombine' = 'true',
'precombine.field' = 'op_ts',
-- compaction
'compaction.tasks' = '8',
'compaction.schedule.enabled' = 'true',
'compaction.async.enabled' = 'false',
'compaction.max_memory' = '128',
'compaction.delta_commits' = '3',
-- clean
'clean.async.enabled' = 'true',
'clean.policy' = 'KEEP_LATEST_BY_HOURS',
'clean.retain_hours' = '72'
);
insert into ratio_path_company select * from source_table;

got exception:
Caused by: java.lang.IllegalArgumentException: INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.
at org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:316) ~[hudi-1.0.jar:?]
at org.apache.parquet.avro.AvroSchemaConverter$1.convertINT96(AvroSchemaConverter.java:298) ~[hudi-1.0.jar:?]
at org.apache.parquet.schema.PrimitiveType$PrimitiveTypeName$7.convert(PrimitiveType.java:341) ~[hudi-1.0.jar:?]
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:297) ~[hudi-1.0.jar:?]
at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:275) ~[hudi-1.0.jar:?]
at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:264) ~[hudi-1.0.jar:?]
at org.apache.hudi.common.table.TableSchemaResolver.convertParquetSchemaToAvro(TableSchemaResolver.java:293) ~[hudi-1.0.jar:?]
at org.apache.hudi.common.table.TableSchemaResolver.getTableAvroSchemaFromDataFile(TableSchemaResolver.java:116) ~[hudi-1.0.jar:?]
at org.apache.hudi.util.CompactionUtil.inferChangelogMode(CompactionUtil.java:145) ~[hudi-1.0.jar:?]
at org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.(HoodieFlinkCompactor.java:180) ~[hudi-1.0.jar:?]
at org.apache.hudi.sink.compact.HoodieFlinkCompactor.main(HoodieFlinkCompactor.java:75) ~[hudi-1.0.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_302]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_302]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_302]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_302]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[hudi-1.0.jar:?]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[hudi-1.0.jar:?]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) ~[hudi-1.0.jar:?]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[hudi-1.0.jar:?]

@li-ang-666 li-ang-666 changed the title [SUPPORT]hudi on flink, after bulk_insert & bucket_index, get int96 exception when flink trigger compaction [SUPPORT]hudi[0.13.1] on flink[1.16.2], after bulk_insert & bucket_index, get int96 exception when flink trigger compaction Sep 29, 2023
@danny0405
Copy link
Contributor

danny0405 commented Sep 29, 2023

INT96 is a legacy type type for timestamp(9) type that has precision greater than 6, did you check your table for timestamp data types?

@li-ang-666
Copy link
Author

INT96 is a legacy type type for decimal type that has precision greater than 18, did you check your table for decimal data types?

I did used decimal(24,12), if I insert the offline data by stream upsert, then where will be no exception

@li-ang-666
Copy link
Author

INT96 is a legacy type type for decimal type that has precision greater than 18, did you check your table for decimal data types?

I wonder if there is something wrong with the parquet schema selection used by bulk_insert

@danny0405
Copy link
Contributor

The bulk_insert uses the Flink native parquet writers directly, and here is a fix for the compatibility: #8418, and the patch is available in release 0.13.1 and 0.14.0.

@li-ang-666
Copy link
Author

li-ang-666 commented Sep 30, 2023

The bulk_insert uses the Flink native parquet writers directly, and here is a fix for the compatibility: #8418, and the patch is available in release 0.13.1 and 0.14.0.

current now, I used the pom:

groupId org.apache.hudi
artifactId hudi-flink1.16-bundle
version 0.13.1

Did this patch really work in 0.13.1, or are we not describing the same problem?

@danny0405
Copy link
Contributor

Hmm, the write works here, it is the offline compactor throws exception. It seems the INT96 is not supported by default for the parquet-avro lib, but there is a config option parquet.avro.readInt96AsFixed, if you enable it, the INT96 would be parsed as fixed byte array.

The option can be set up with --propsfrom the submit cmd, but it is only available from release 0.14.0.

@codope codope added the flink Issues related to flink label Oct 3, 2023
@li-ang-666
Copy link
Author

li-ang-666 commented Oct 3, 2023

parquet.avro.readInt96AsFixed

now I changed the pom to version-0.14.0, but how could I use this option(parquet.avro.readInt96AsFixed) in Flink online compaction?

@danny0405
Copy link
Contributor

Did you check your .hoodie/hoodie.properties file to see whether there is a table schema option?

@danny0405
Copy link
Contributor

It should be like this: --hoodie-conf k1=v1,k2=v2, for your opition, it should be --hoodie-conf hadoop.parquet.avro.readInt96AsFixed=true

@li-ang-666
Copy link
Author

Did you check your .hoodie/hoodie.properties file to see whether there is a table schema option?

here it is:
#Updated at 2023-10-04T10:58:21.290Z
#Wed Oct 04 18:58:21 CST 2023
hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator
hoodie.table.precombine.field=op_ts
hoodie.table.version=6
hoodie.database.name=default_database
hoodie.datasource.write.hive_style_partitioning=false
hoodie.table.metadata.partitions.inflight=
hoodie.table.checksum=3049593976
hoodie.table.create.schema={"type":"record","name":"ratio_path_company_record","namespace":"hoodie.ratio_path_company","fields":[{"name":"id","type":{"type":"fixed","name":"fixed","namespace":"hoodie.ratio_path_company.ratio_path_company_record.id","size":9,"logicalType":"decimal","precision":20,"scale":0}},{"name":"company_id","type":["null","long"],"default":null},{"name":"shareholder_id","type":["null","string"],"default":null},{"name":"shareholder_entity_type","type":["null","int"],"default":null},{"name":"shareholder_name_id","type":["null","long"],"default":null},{"name":"investment_ratio_total","type":["null",{"type":"fixed","name":"fixed","namespace":"hoodie.ratio_path_company.ratio_path_company_record.investment_ratio_total","size":11,"logicalType":"decimal","precision":24,"scale":12}],"default":null},{"name":"is_controller","type":["null","int"],"default":null},{"name":"is_ultimate","type":["null","int"],"default":null},{"name":"is_big_shareholder","type":["null","int"],"default":null},{"name":"is_controlling_shareholder","type":["null","int"],"default":null},{"name":"equity_holding_path","type":["null","string"],"default":null},{"name":"create_time","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null},{"name":"update_time","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null},{"name":"is_deleted","type":["null","int"],"default":null},{"name":"op_ts","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null}]}
hoodie.table.cdc.enabled=false
hoodie.archivelog.folder=archived
hoodie.table.name=ratio_path_company
hoodie.compaction.payload.class=org.apache.hudi.common.model.EventTimeAvroPayload
hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5
hoodie.table.type=MERGE_ON_READ
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=
hoodie.timeline.layout.version=1
hoodie.table.recordkey.fields=id

@li-ang-666
Copy link
Author

It should be like this: --hoodie-conf k1=v1,k2=v2, for your opition, it should be --hoodie-conf hadoop.parquet.avro.readInt96AsFixed=true

I changed my FlinkSQL to :

CREATE TABLE source_table (
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
op_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'e1d4c.json.prism_shareholder_path.ratio_path_company',
'properties.bootstrap.servers' = '10.99.202.90:9092,10.99.206.80:9092,10.99.199.2:9092',
'properties.group.id' = 'demo-job',
'scan.startup.mode' = 'earliest-offset',
-- canal
'format' = 'canal-json',
'canal-json.ignore-parse-errors' = 'true',
'canal-json.encode.decimal-as-plain-number' = 'true'
);

create table ratio_path_company(
id DECIMAL(20, 0),
company_id BIGINT,
shareholder_id STRING,
shareholder_entity_type SMALLINT,
shareholder_name_id BIGINT,
investment_ratio_total DECIMAL(24, 12),
is_controller SMALLINT,
is_ultimate SMALLINT,
is_big_shareholder SMALLINT,
is_controlling_shareholder SMALLINT,
equity_holding_path STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0),
is_deleted SMALLINT,
op_ts TIMESTAMP(0),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'obs://hadoop-obs/ods_hudi/ratio_path_company005',
'table.type' = 'MERGE_ON_READ',
-- cdc
'changelog.enabled' = 'true',
-- index
'index.type' = 'BUCKET',
'hoodie.bucket.index.num.buckets' = '256',
-- write
'write.tasks' = '8',
'write.task.max.size' = '512',
'write.batch.size' = '12',
'write.merge.max_memory' = '28',
'write.log_block.size' = '128',
'write.precombine' = 'true',
'precombine.field' = 'op_ts',
-- compaction
'compaction.tasks' = '8',
'compaction.schedule.enabled' = 'true',
'compaction.async.enabled' = 'true',
'compaction.max_memory' = '128',
'compaction.delta_commits' = '3',
-- clean
'clean.async.enabled' = 'true',
'clean.policy' = 'KEEP_LATEST_BY_HOURS',
'clean.retain_hours' = '72',
'hadoop.parquet.avro.readInt96AsFixed' = 'true'
);

insert into ratio_path_company select * from source_table;

It did take effect, but I encountered a new problem:
image
2023-10-04 19:02:01.793 [ERROR] [pool-12-thread-1] (NonThrownExecutor.java:140) - Executor executes action [Execute compaction for instant 20231003234910347 from task 2] error
org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdateInternal(HoodieFlinkCopyOnWriteTable.java:423) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.handleUpdate(HoodieFlinkCopyOnWriteTable.java:414) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.table.action.compact.CompactionExecutionHelper.writeFileAndGetWriteStats(CompactionExecutionHelper.java:64) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:237) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:147) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.sink.compact.CompactOperator.doCompaction(CompactOperator.java:142) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.sink.compact.CompactOperator.lambda$processElement$0(CompactOperator.java:124) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_302]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_302]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:75) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
... 11 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
... 11 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file obs://hadoop-obs/ods_hudi/ratio_path_company005/00000019-da3a-4963-8d2b-9618b89d2f93_19-128-1_20231003230828396.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
... 11 more
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: optional fixed_len_byte_array(12) create_time != optional int96 create_time
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:93) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.schema.PrimitiveType.accept(PrimitiveType.java:602) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:83) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.schema.MessageType.accept(MessageType.java:55) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147) ~[blob_p-d52f9bdde15fcc5202b1e5453d346309de61818c-dff7e5bfe71d3391b3a22a198a642f47:?]
... 11 more

@danny0405
Copy link
Contributor

Yeah, I checked the code for schema resolving, the TableSchemaResolver first decodes the schema from instant commit metadata, then from the table option hoodie.table.create.schema in hoodie.properties, then from the data files. Not sure why your code runs into resolvind from the data files, because your hoodie.properties already includes the table schema.

Here is the code snippet:

private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) {

@li-ang-666
Copy link
Author

li-ang-666 commented Oct 5, 2023

Yeah, I checked the code for schema resolving, the TableSchemaResolver first decodes the schema from instant commit metadata, then from the table option hoodie.table.create.schema in hoodie.properties, then from the data files. Not sure why your code runs into resolvind from the data files, because your hoodie.properties already includes the table schema.

Here is the code snippet:

private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) {

I debug runMerge() of BaseMergeHelper.java , the writer schema is timestamp-millis , but the reader schema is int96, the schema in hoodie.properties is timestamp-millis

@li-ang-666
Copy link
Author

Yeah, I checked the code for schema resolving, the TableSchemaResolver first decodes the schema from instant commit metadata, then from the table option hoodie.table.create.schema in hoodie.properties, then from the data files. Not sure why your code runs into resolvind from the data files, because your hoodie.properties already includes the table schema.

Here is the code snippet:

private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) {

I guess the parquet file produced by bulk_insert contains int96, how could I do not write int96 but timestamp-millis

@danny0405
Copy link
Contributor

INT96 comes from decimal type not timestamp(3). Can you remove the option 'hadoop.parquet.avro.readInt96AsFixed' = 'true' and debug why the schema string in hoodie.properties is not resolved ? Did you ever change the table schema?

@li-ang-666
Copy link
Author

li-ang-666 commented Oct 6, 2023

INT96 comes from decimal type not timestamp(3). Can you remove the option 'hadoop.parquet.avro.readInt96AsFixed' = 'true' and debug why the schema string in hoodie.properties is not resolved ? Did you ever change the table schema?

never change schema, but if remove the option, will back to the exception java.lang.IllegalArgumentException: INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array, where is the debug code point?

@li-ang-666
Copy link
Author

INT96 comes from decimal type not timestamp(3). Can you remove the option 'hadoop.parquet.avro.readInt96AsFixed' = 'true' and debug why the schema string in hoodie.properties is not resolved ? Did you ever change the table schema?

if I do not use bulk_insert, the compaction is well-done

@li-ang-666
Copy link
Author

BaseMergeHelper

image here is the stack, it returns the int96

@danny0405
Copy link
Contributor

You first error strack trace indicates that you were doing an offline compaction job, and the exception is thrown when the TableSchemaResolver decodes the table schema, is that the case you enocuntered?

@li-ang-666
Copy link
Author

You first error strack trace indicates that you were doing an offline compaction job, and the exception is thrown when the TableSchemaResolver decodes the table schema, is that the case you enocuntered?

I set :
'compaction.tasks' = '8',
'compaction.schedule.enabled' = 'true',
'compaction.async.enabled' = 'true',
'compaction.max_memory' = '128',
'compaction.delta_commits' = '2',
I think is online compaction, and I never change my schema TAT

@li-ang-666
Copy link
Author

You first error strack trace indicates that you were doing an offline compaction job, and the exception is thrown when the TableSchemaResolver decodes the table schema, is that the case you enocuntered?

I set : 'compaction.tasks' = '8', 'compaction.schedule.enabled' = 'true', 'compaction.async.enabled' = 'true', 'compaction.max_memory' = '128', 'compaction.delta_commits' = '2', I think is online compaction, and I never change my schema TAT

I return back to flink-1.15.4 and hudi-0.12.3 get new exception:
#9828

@danny0405
Copy link
Contributor

I think timestamp(0) type is the culprit, in PR https://github.com/apache/hudi/pull/8418/files, we adapter to timestamp(3) and timestamp(6) but any precision other than that is written as INT96, which causes the error, so we need to support that separately. cc @voonhous , can you support that?

@li-ang-666
Copy link
Author

Contributor

I try try timestamp(3)

@li-ang-666
Copy link
Author

I think timestamp(0) type is the culprit, in PR https://github.com/apache/hudi/pull/8418/files, we adapter to timestamp(3) and timestamp(6) but any precision other than that is written as INT96, which causes the error, so we need to support that separately. cc @voonhous , can you support that?

does 0.12.3 has this pr?

@li-ang-666
Copy link
Author

I think timestamp(0) type is the culprit, in PR https://github.com/apache/hudi/pull/8418/files, we adapter to timestamp(3) and timestamp(6) but any precision other than that is written as INT96, which causes the error, so we need to support that separately. cc @voonhous , can you support that?

0.12.3 timestamp(3) worked!!!!!!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink schema-and-data-types
Projects
Archived in project
Development

No branches or pull requests

3 participants