-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow Pulsar IO Sources to push GenericRecord instances encoded with AVRO #9481
Conversation
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
Outdated
Show resolved
Hide resolved
@congbobo184 Could you please also help review this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
import lombok.Getter; | ||
import lombok.Setter; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.pulsar.client.api.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to avoid star import.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sure.
I will update the patch after @congbobo184 review, in order to save CI cycles
thanks
@congbobo184 do you have cycles ? |
@codelipenghui I have addressed your comment and merged with current master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli Sorry for late comment
|
||
/** | ||
* This is an adapter from Pulsar GenericRecord to Avro classes. | ||
*/ | ||
private class GenericRecordAdapter extends SpecificRecordBase { | ||
private GenericRecord message; | ||
|
||
void setCurrentMessage(GenericRecord message) { | ||
this.message = message; | ||
} | ||
@Override | ||
public Schema getSchema() { | ||
return schema; | ||
} | ||
|
||
@Override | ||
public Object get(int field) { | ||
return message.getField(schema.getFields().get(field).name()); | ||
} | ||
|
||
@Override | ||
public void put(int field, Object value) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we generate writer is GenericDatumWriter
, it can't write SpecificRecordBase
, we should support SpecificDatumWriter. So, I don't think it a good way to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are in a writer and a writer is supposed to only read from the object and not to alter the contents.
So it is good here that we prevent any change.
I cannot understand your point.
The integration tests makes an example of a Source that is pushing GenericRecord instances and then those messages can be consumed by a standard Pulsar consumer with AUTO_CONSUME
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also can change the message in GenericRecordAdapter, GenericRecordAdapter seems to have no effect. It can't prevent change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we are adding an adapter from a Pulsar GenericRecord to SpecificRecordBase but we are interested in implementing only read-only methods.
if you prefer I can leave the implementation of this method empty, the method won't be called by the writer.
If you run the integration test you will see that we are passing thru this code.
Current implementation in master assumes that the GenericRecord is always a GenericAvroRecord, but in case of this new feature it can be any implementation of GenericRecord (and in fact in my case it will be a generic data structure generated dynamically, with a dynamic schema)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we make GenericRecordAdapter
extend SpecificRecordBase
not GenericRecord
, seem private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> writer;
only can write GenericRecord
not SpecificRecordBase
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184 I have double checked, and we indeed only need to implement GenericRecord.
The usage of SpecificRecordBase was a left-over of my initial implementation.
I have simplified the code by implementing all of the GenericRecord fields.
good catch !
if (!input && clazz.equals(GenericRecord.class)) { | ||
return new AutoProduceBytesSchema(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems integration test don't cover here, it still use record.getSchema()
. What is the integration test testing? I did not understand here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The integration test without this change does not work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you start a source, but source don't getSchema by this method. I don't understand where use this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184
This method is called by PulsarSink
in initializeSchema()
method via TopicSchema#getSchema
The PulsarSink is the entity that actually write to Pulsar.
When you create a Source Pulsar IO creates a PulsarSink to write to the destination topic.
When you start the Source you are going to use this special AutoProduceBytesSchema
(it was pre-existing, I did not add it).
Initially the Source does not enforce a Schema on the topic (we achieve this with AutoProduceBytesSchema
).
When the Source passes a Record to the Pulsar runtime, the PulsarSink picks up the Schema (using Record#getSchema) and sets the schema properly.
Therefore when the schema changes (in a compatible way) the runtime automatically updates the schema.
In fact when the Source starts you cannot know the schema, because the schema is generated dynamically by the Source itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your detailed reply, I understand here. :)
...-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericRecordSource.java
Outdated
Show resolved
Hide resolved
@congbobo184 I have addressed your comment PTAL, thanks for your review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/pulsarbot run-failures-tests |
/pulsarbot run-failure-tests |
/pulsarbot run-failure-checks |
@congbobo184 CI passed, I hope that this patch is good to go now thank you again for your review @congbobo184 @codelipenghui and @sijie |
LGTM! great work! |
thank you @congbobo184 |
@sijie @codelipenghui can we merge it please ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@sijie can you please help to review this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli I don't think this is the right implementation.
The fundamental problem this pull request tries to solve is to allow Pulsar Sink to write a record based on its schema information it carries.
This should be done in the PulsarSink implementation. As the current PulsarSink implementation already does this job to construct a message based on the schema of a Sink record. I don't think we need to change the existing schema implementation. As those schema implementations are designed to write one version and read multi-versions, and multi-version schema writes are implemented in the Pulsar client level.
I am happy to craft a skeleton on this change.
@eolivelli I think the change can be as simple as #9590. I haven't verified it works. But it should be the right direction to go. Because PulsarSink already handles writing messages using multiple schemas ( pulsar/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java Line 230 in c27f7a3
|
For the reviewers on this pull request, please take a look at #9590. In order to support writing a generic record, we don't need to change the existing AVRO schema implementation. It should be done in PulsarSink, as it needs to support writing multiple versions of generic records. Pulsar Sink already has the framework to support that by leveraging PIP-43. #9590 is ready for review with unit tests. integration tests to be added. |
@sijie Thanks for the comment. If I understand correctly, the approach is using the AUTO_CONSUME schema and the PIP-43 to achieve this purpose. Previously, we always use the AUTO_CONSUME schema for a consumer. Essentially, the AUTO_CONSUME schema is a Generic Schema that works with GenericRecord, encode a GenericRecord to byte[] and decode byte[] to GenericRecord. So we don't need to change the existing implementation, we can use the AUTO_CONSUME schema for a producer too to allow the producer able to publish GenericRecord. And with PIP-45, the message carries the real schema. So that we are able to publish GenericRecord with different version schemas. Please point out if I am wrong, thanks. |
@codelipenghui: Most of the understandings are correct. There is one mistake. We don't use Then we leverage PIP-45 to write the messages based on the schema information they carry. |
Motivation
Currently Pulsar Source Connectors cannot produce messages using the GenericRecord API.
Producing messages with the GenericRecord API will allow to dynamically generate data structures that can be consumed downstream using the supported encoding
Modifications
Allow a Pulsar Source to be declared to produce "GenericRecord" instances, this in turn means to change PulsarSink that is the entity that receives message from the Source and writes to the Pulsar topic.
Verifying this change
The change adds integration tests and new unit tests
Documentation
There is no need to add documentation, because it comes naturally for a developer to declare the source as
Source<GenericRecord>
and it is expected that it works