-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support writing general records to Pulsar sink #9590
Conversation
The unit tests are added. Integration tests to be added. |
if (GenericRecord.class.isAssignableFrom(typeArg)) { | ||
consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString()); | ||
SchemaType configuredSchemaType = SchemaType.valueOf(pulsarSinkConfig.getSchemaType()); | ||
if (SchemaType.AUTO_CONSUME != configuredSchemaType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to add consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType());
in this branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. The schema type is already overwritten in line 419. This is just to log an info message to indicate that the schema type has been overwritten to AUTO_CONSUME
.
@@ -400,7 +415,16 @@ public void close() throws Exception { | |||
ConsumerConfig consumerConfig = new ConsumerConfig(); | |||
consumerConfig.setSchemaProperties(pulsarSinkConfig.getSchemaProperties()); | |||
if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) { | |||
consumerConfig.setSchemaType(pulsarSinkConfig.getSchemaType()); | |||
if (GenericRecord.class.isAssignableFrom(typeArg)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie initially you pointed out that working here in PulsarSink is not the right way, but we should only work on TopicSchema
#9481 (comment)
In fact I believe that in my PR #9481 I took the right way, driven by your suggestions.
I believe that this change is not enough in order to support by needs.
BTW if the integration test I added to #9481 works with this patch then we can converge to a good solution.
My goal is to get that usecase work, in the best way for the project for the mid/long term
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli Yes and no on my original comment.
My original comment is to make sure we returned the write schema information via TopicSchema. Because we are using AUTO_CONSUME
in the PulsarSink to indicate GenericRecord
are published to the Pulsar topic. AUTO_CONSUME
can be used by both source and sink. In order to not impact sources, I didn't add the logic in TopicSchema
. Instead, I add it in PulsarSink to make it more explicit, which results in one line of similar change as your initial change. But it doesn't your original and current implementation is in the right direction.
The main problem of your previous and current implementation on #9481 is you are trying to hijack the existing AVRO implementation to introduce the support of lazy schema initialization. The lazy schema initialization is already implemented as part of multi-schema write support. So you don't need to add such a hack.
I totally agree that the main point here is to prevent the PulsarSink from creating the Producer and forcing a Schema on the topic in case of if you are okay I can merge this patch in my branch at #9481 (and revert the changes to TopicSchema) as we already have integration tests and I can continue the work. I just want to see this feature land to master branch and make it available to our users. |
I will complete the integration tests here. |
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failed tests:
|
@sijie do you have time to complete this patch. It is very useful for a couple of usecases I saw and I really would like this work to land to master |
@eolivelli i am work with @sijie to fix the ci failed issue, will update and finish this pr soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks you guys for this work.
It is really a good step forward.
But I need a little more, please check my latest comment
Thread.sleep(20); | ||
|
||
int value = count.incrementAndGet(); | ||
GenericRecord record = schema.newRecordBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie @codelipenghui unfortunately this is not exactly like my original test case, that reproduced my use case.
That is to be able to push an object that implements GenericRecord.
Here you are using the builder provided by Pulsar but this is mo enough for me, because my user would like to use an object from his own domain, just by implementing Pulsar GenericRecord java interface, because we will save resources (allocations and cycles)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GenericRecord was designed to be from RecordSchemaBuilder. It doesn't expect people to implement the GenericRecord directly. I don't understand the allocations and cycles issue. If it is an allocations or cycles issue, it should be fixed in RecordSchemaBuilder. It shouldn't be done by just implementing GenericRecord
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie okay
I will check downstream if using RecordSchemaBuilder is a valid option.
thanks
(cherry picked from commit b826e03)
No description provided.