Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-io] Fix invalid topic name generation in kafka-source-connector #9035

Merged
merged 1 commit into from
Jan 11, 2021

Conversation

rdhabalia
Copy link
Contributor

Motivation

Right now, kafka-source-connector creates invalid topic name which causes error while creating producer in debezium io-source.

21:22:41,772 DEBUG [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] JavaInstance - Got result: object: (key = "[B@3bbd472c", value = "[B@1aa204ba")
21:22:41,780 ERROR [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] PulsarSink - Failed to create Producer while doing user publish
org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: Invalid topic name: 'my-property/us-west/my-ns/topic1'
	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:892) ~[org.apache.pulsar-pulsar-client-api-2.6.2.jar:2.6.2]
	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93) ~[org.apache.pulsar-pulsar-client-original-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.createProducer(PulsarSink.java:107) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:117) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) [?:?]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:115) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:111) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtMostOnceProcessor.newMessage(PulsarSink.java:184) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:295) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:449) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.lambda$processResult$0(JavaInstanceRunnable.java:431) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [?:?]
	at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883) [?:?]
	at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251) [?:?]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:422) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:283) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at java.lang.Thread.run(Thread.java:834) [?:?]
21:22:41,781 INFO [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] JavaInstanceRunnable - Encountered exception in sink write: 
java.lang.RuntimeException: org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: Invalid topic name: 'my-property/us-west/my-ns/topic1'
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:124) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705) ~[?:?]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:115) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.getProducer(PulsarSink.java:111) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkAtMostOnceProcessor.newMessage(PulsarSink.java:184) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.sink.PulsarSink.write(PulsarSink.java:295) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:449) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.lambda$processResult$0(JavaInstanceRunnable.java:431) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [?:?]
	at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883) [?:?]
	at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251) [?:?]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.processResult(JavaInstanceRunnable.java:422) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:283) [org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: Invalid topic name: 'my-property/us-west/my-ns/topic1'
	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:892) ~[org.apache.pulsar-pulsar-client-api-2.6.2.jar:2.6.2]
	at org.apache.pulsar.client.impl.ProducerBuilderImpl.create(ProducerBuilderImpl.java:93) ~[org.apache.pulsar-pulsar-client-original-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.createProducer(PulsarSink.java:107) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	at org.apache.pulsar.functions.sink.PulsarSink$PulsarSinkProcessorBase.lambda$getProducer$0(PulsarSink.java:117) ~[org.apache.pulsar-pulsar-functions-instance-2.6.2.jar:2.6.2]
	... 13 more
21:22:41,790 INFO [my-property/us-west/my-ns/debezium-postgres-source-0] [instance: 0] AvroDataConfig - AvroDataConfig values: 
	schemas.cache.config = 1000
	enhanced.avro.schema.support = false
	connect.meta.data = true

@rdhabalia rdhabalia added this to the 2.8.0 milestone Dec 23, 2020
@rdhabalia rdhabalia self-assigned this Dec 23, 2020
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
No test is needed IMHO

@rdhabalia
Copy link
Contributor Author

/pulsarbot run-failure-checks

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia before merging this, can I understand a bit more when this issue will happen? Ideally, we don't need to add the topic domain persistent://.

@rdhabalia
Copy link
Contributor Author

@sijie I didn't see your comment.
it's happening with below command where Invalid topic placeholder: "topic.creation.classificationlog.include":"my-topic.public.classification",

error:

Getting the following ERRORs when trying to create producer using Pulsar debezium IO connector...
21:22:41,780 ERROR [my-prop/my-cluster/debezium-postgres-source-0] [instance: 0] PulsarSink - Failed to create Producer while doing user publish
org.apache.pulsar.client.api.PulsarClientException$InvalidTopicNameException: Invalid topic name: 'my-prop/my-cluster/ns/my-topic.public.classification'
./bin/pulsar-admin source localrun \
--tenant my-prop \
--namespace my-cluster/ns \
--name debezium-postgres-source \
--destination-topic-name persistent://my-prop/my-cluster/ns/debezium-postgres-topic-3 \
--archive /root/apache-pulsar-2.6.2/pulsar-io-debezium-1.3.1-postgres-2.7.0-SNAPSHOT.nar \
--classname org.apache.pulsar.io.debezium.postgres.DebeziumPostgresSource \
--parallelism 1 \
--processingGuarantees ATLEAST_ONCE \
--broker-service-url pulsar+ssl://broker-uswest.com:6651 \
--client-auth-params tlsCertFile:/cert.pem,tlsKeyFile:/key.pem \
--client-auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationTls \
--tls-trust-cert-path /ca.pem \
--use-tls \
--source-config '

{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":1,"database.hostname":"ns-db.mail.bf1.yahoo.com","database.port":"1234","database.user":"test","database.password":"123","database.dbname":"ns","database.server.name":"ns","table.include.list":"public.classification","plugin.name":"pgoutput","message.key.columns":"public.classification:iprange","snapshot.mode":"always","slot.name":"cms_debezium","topic.creation.default.replication.factor":3,"topic.creation.default.partitions":5,"topic.creation.default.cleanup.policy":"compact","topic.creation.default.compression.type":"producer","topic.creation.groups":"classificationlog","topic.creation.classificationlog.include":"my-topic.public.classification","topic.creation.classificationlog.replication.factor":3,"topic.creation.classificationlog.partitions":5,"topic.creation.classificationlog.cleanup.policy":"compact","topic.creation.classificationlog.retention.ms":7776000000,"topic.creation.classificationlog.compression.type":"producer","pulsar.service.url":"pulsar+ssl://broker-uswest.com:6651","offset.storage.topic":"persistent://my-prop/my-cluster/ns/debezium-storage-topic-3","pulsar.auth.plugin":"org.apache.pulsar.client.impl.auth.AuthenticationTls","pulsar.auth.plugin.param":"tlsCertFile:/cert.pem,tlsKeyFile:/key.pem","pulsar.tls.insecure.connection":"true","pulsar.tls.trust.cert":"/ca.pem"}

I think it also doesn't support cluster name in source config for which I will create a separate PR.

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia Gotcha. I see. So the problem happens when you use v1 topic name. Thank you for your clarification!

@sijie sijie merged commit 1c3a743 into apache:master Jan 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants