-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KafkaConnectSink effectively ignores topic name from sink configuration file #12880
Comments
@kaja78 I don't think using |
The character replacement happens only in case there is no topic2table mapping defined and topic name is not valid snowflake identifier: https://github.com/snowflakedb/snowflake-kafka-connector/blob/v1.5.5/src/main/java/com/snowflake/kafka/connector/Utils.java#L536 Since topic name contains colon character, there is no way to specify table mapping for pulsar persistent topics. I think the Kafka topic name derivation logic should be at least refactored into protected method, so one can easily override it in child Sink implementation. Anyway, I think current implementation is not consistent with @FieldDoc. So may be making topic optional and using it to override original pulsar topic name makes more sense. |
The issue had no activity for 30 days, mark with Stale label. |
Closed as stale and no consensus. Please create a new issue if it's still relevant to the maintained versions. See #12893 (review). |
Describe the bug
The topic configuration property of PulsarKafkaConnectSinkConfig is not passed to Kafka sink. Instead of it, the original Pulsar topic name is passed.
In the KafkaConnectSink.toSinkRecord() method, the topic name is derived using:
final String topic = sourceRecord.getTopicName().orElse(topicName);
The topicName instance variable holds topic name from PulsarKafkaConnectSinkConfig. However I guess, there is always topicName present on sourceRecord, so topic property from configuration is never passed to Kafka sink.
Expected behaviour
Simply the topic from PulsarKafkaConnectSinkConfig should be always used in KafkaConnectSink.toSinkRecord().
final String topic = topicName;
Actually it would make sense to reverse the original logic => if the topic is not set in sink configuration file, then use pulsar topic name. However, the topic configuration property is currently required, so it just should be always used.
Additional context
I identified this issue while testing https://github.com/datastax/snowflake-connector. It uses KafkaConnectSink as parent class.
The target table name is derived from Kafka topic name in com.snowflake.kafka.connector.SnowflakeSinkConnector. Since I am not able to configure the topic name in sink configuration file, pulsar messages are stored into tables with ugly names following pattern: PERSISTENT___[tenant][namespace][topic]_[hash] .
The text was updated successfully, but these errors were encountered: