Skip to content

[Bug] Pulsar 3.0.3 client doesn't work with Flink connector: IllegalArgumentException: lookupTimeout must not be negative #22242

@lhotari

Description

@lhotari

Search before asking

  • I searched in the issues and found nothing similar.

Version

Pulsar client 3.0.3

Minimal reproduce step

Configure Pulsar connector in Flink and use Flink SQL.

What did you expect to see?

No exceptions

What did you see instead?

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: lookupTimeout must not be negative
Caused by: java.lang.IllegalArgumentException: lookupTimeout must not be negative
	at org.apache.pulsar.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:143)
	at org.apache.pulsar.client.impl.ClientBuilderImpl.lookupTimeout(ClientBuilderImpl.java:169)
	at org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.lambda$createClient$1(PulsarClientFactory.java:116)
	at org.apache.flink.connector.pulsar.common.config.PulsarConfiguration.useOption(PulsarConfiguration.java:101)
	at org.apache.flink.connector.pulsar.common.config.PulsarConfiguration.useOption(PulsarConfiguration.java:91)
	at org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient(PulsarClientFactory.java:115)
	at org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener.open(MetadataListener.java:102)
	at org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.<init>(PulsarWriter.java:120)
	at org.apache.flink.connector.pulsar.sink.PulsarSink.createWriter(PulsarSink.java:134)
	at org.apache.flink.connector.pulsar.sink.PulsarSink.createWriter(PulsarSink.java:83)
	at org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterStateHandler.createWriter(StatelessSinkWriterStateHandler.java:39)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:149)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.base/java.lang.Thread.run(Unknown Source)

Anything else?

This broke with #22023 changes.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Labels

release/blockerIndicate the PR or issue that should block the release until it gets resolvedtype/bugThe PR fixed a bug or issue reported a bug

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions