We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
When sourceMetricData is not initialized, DynamicPulsarDeserializationSchema throws a null pointer exception.
java.lang.RuntimeException: java.lang.NullPointerException at org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector.collect(CallbackCollector.java:39) at org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.emitRow(InLongMsgDeserializationSchema.java:157) at org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.lambda$deserialize$0(InLongMsgDeserializationSchema.java:113) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.deserialize(InLongMsgDeserializationSchema.java:113) at org.apache.flink.streaming.util.serialization.ThreadSafeDeserializationSchema.deserialize(ThreadSafeDeserializationSchema.java:53) at org.apache.inlong.sort.pulsar.table.DynamicPulsarDeserializationSchema.deserialize(DynamicPulsarDeserializationSchema.java:128) at org.apache.inlong.sort.pulsar.table.DynamicPulsarDeserializationSchema.deserialize(DynamicPulsarDeserializationSchema.java:119) at org.apache.inlong.sort.pulsar.table.DynamicPulsarDeserializationSchema.deserialize(DynamicPulsarDeserializationSchema.java:47) at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.emitRecord(ReaderThread.java:171) at org.apache.flink.streaming.connectors.pulsar.internal.ReaderThread.run(ReaderThread.java:110) Caused by: java.lang.NullPointerException at org.apache.inlong.sort.pulsar.table.DynamicPulsarDeserializationSchema.lambda$deserialize$0(DynamicPulsarDeserializationSchema.java:129) at org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector.collect(CallbackCollector.java:37) ... 11 more
Inlong manager generates the following sql. use this sql can reproduce the problem.
CREATE TABLE `table_stream_01`( `uid` STRING, `event_time` STRING) WITH ( 'inlong.metric.labels' = 'groupId=group_002&streamId=stream_01&nodeId=stream_01', 'connector' = 'pulsar-inlong', 'inlong-msg.csv.disable-quote-character' = 'true', 'inlong-msg.inner.format' = 'csv', 'inlong-msg.csv.allow-comments' = 'false', 'format' = 'inlong-msg', 'inlong-msg.csv.field-delimiter' = ',', 'inlong-msg.csv.ignore-parse-errors' = 'true', 'inlong-msg.ignore-parse-errors' = 'false', 'admin-url' = 'http://x.x.x.x:8080', 'generic' = 'true', 'service-url' = 'pulsar://x.x.x.x:6650', 'topic' = 'public/group_002/stream_01', 'scan.startup.mode' = 'earliest' ); CREATE TABLE `table_sink_01`( PRIMARY KEY (`event_time`) NOT ENFORCED, `uid` STRING, `event_time` STRING) WITH ( 'inlong.metric.labels' = 'groupId=group_002&streamId=stream_01&nodeId=sink_01', 'connector' = 'jdbc-inlong', 'url' = 'jdbc:mysql://x.x.x.x:3306/test_data', 'username' = 'xxx', 'password' = 'xxx', 'table-name' = 'test_11251558' ); INSERT INTO `table_sink_01` SELECT `uid` AS `uid`, `event_time` AS `event_time` FROM `table_stream_01`;
No response
master
InLong Sort
The text was updated successfully, but these errors were encountered:
wangpeix
Successfully merging a pull request may close this issue.
What happened
When sourceMetricData is not initialized, DynamicPulsarDeserializationSchema throws a null pointer exception.
What you expected to happen
How to reproduce
Inlong manager generates the following sql. use this sql can reproduce the problem.
Environment
No response
InLong version
master
InLong Component
InLong Sort
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: