Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: When reading partial fields from Logstore, the number of fields does not match #1171

Closed
1 task done
lklhdu opened this issue Feb 28, 2023 · 0 comments · Fixed by #1172
Closed
1 task done
Labels
priority:critical type:bug Something isn't working
Milestone

Comments

@lklhdu
Copy link
Contributor

lklhdu commented Feb 28, 2023

What happened?

When reading partial fields from Logstore, the number of fields does not match then will throw IllegalArgumentException.

Affects Versions

master

What engines are you seeing the problem on?

Flink

How to reproduce

CREATE TABLE `arctic_catalog`.`arctic_db`.`test_table` (
    id BIGINT,
    name STRING,
    op_time TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) using arctic
  tblproperties (
    "log-store.enabled" = "true",
    "log-store.topic"="topic_log_test",
    "log-store.address"="localhost:9092"
);
create table print_tb(
            id int,
           op_time TIMESTAMP,
)with(
    'connector' = 'print'
);
-- select partial fields from table schema
insert into print_tb select id, op_time from `arctic_catalog`.`arctic_db`.`test_table` 
/*+ OPTIONS('arctic.read.mode'='log','streaming'='true','scan.startup.mode'='earliest') */;

Relevant log output

java.lang.IllegalArgumentException: Row arity: 3, but serializer arity: 2
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at SinkConversion$41.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188)
	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
	at com.netease.arctic.flink.read.source.log.kafka.LogKafkaRecordEmitter.emitRecord(LogKafkaRecordEmitter.java:37)
	at com.netease.arctic.flink.read.source.log.kafka.LogKafkaRecordEmitter.emitRecord(LogKafkaRecordEmitter.java:27)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
	at java.lang.Thread.run(Thread.java:750)

Anything else

No response

Code of Conduct

  • I agree to follow this project's Code of Conduct
@lklhdu lklhdu added the type:bug Something isn't working label Feb 28, 2023
@lklhdu lklhdu changed the title [Bug]: When reading some fields from Logstore, the number of fields does not match [Bug]: When reading partial fields from Logstore, the number of fields does not match Feb 28, 2023
@YesOrNo828 YesOrNo828 added this to the Release 0.4.1 milestone Mar 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical type:bug Something isn't working
Projects
None yet
2 participants