-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support get the SchemaInfo from a topic and the schema version #257
Support get the SchemaInfo from a topic and the schema version #257
Conversation
### Motivation Currently there is no public API to get the SchemaInfo from the broker. However, when a consumer tries to decode a message of Avro schema, the writer schema that was stored in the broker is required. The C++ client is not responsible to decode the message, but the extended library like the Python client needs the ability to get the writer schema from the broker. There is a workaround that we can send a REST request to query the schema, but it requires the admin permission. ### Modification - Add the `Client::getSchemaInfoAsync` method to get the schema info asynchronously. When the schema of the given version does not exist, the `SchemaInfo` whose type is `NONE` will be returned. - Add the `Message::getLongSchemaVersion` method to get the schema version of a message. The existing `getSchemaVersion` method is hard to use because it returns a byte array, which users need to know how to decode. - Provide `fromBigEndianBytes` and `toBigEndianBytes` functions to perform conversion between a byte array and the long value of the schema version. Add `Int64SerDes` to test them. - Fix the `LookupServiceTest` that initializes `client_` only once (even with the `GetParam()` method) so the HTTP URL was never tested. - Add `testGetSchemaByVersion` to test `getSchemaInfoAsync`.
6600188
to
6ed1587
Compare
Fixes apache#108 ### Motivation Currently the Python client uses the reader schema, which is the schema of the consumer, to decode Avro messages. However, when the writer schema is different from the reader schema, the decode will fail. ### Modifications Add `attach_client` method to `Schema` and call it when creating consumers and readers. This method stores a reference to a `_pulsar.Client` instance, which leverages the C++ APIs added in apache/pulsar-client-cpp#257 to fetch schema info. The `AvroSchema` class fetches and caches the writer schema if it is not cached, then use both the writer schema and reader schema to decode messages. Add `test_schema_evolve` to test consumers or readers can decode any message whose writer schema is different with the reader schema.
Fixes #108 ### Motivation Currently the Python client uses the reader schema, which is the schema of the consumer, to decode Avro messages. However, when the writer schema is different from the reader schema, the decode will fail. ### Modifications Add `attach_client` method to `Schema` and call it when creating consumers and readers. This method stores a reference to a `_pulsar.Client` instance, which leverages the C++ APIs added in apache/pulsar-client-cpp#257 to fetch schema info. The `AvroSchema` class fetches and caches the writer schema if it is not cached, then use both the writer schema and reader schema to decode messages. Add `test_schema_evolve` to test consumers or readers can decode any message whose writer schema is different with the reader schema.
@BewareMyPower : Can this PR be back merged to Pulsar 2.10 CPP client? |
@armanjupriya-er The Pulsar CPP client version is no longer associated with the Pulsar version. So there is no need to "back merged". Just use the latest client |
Thank you @BewareMyPower of the quick reply. |
Motivation
Currently there is no public API to get the SchemaInfo from the broker. However, when a consumer tries to decode a message of Avro schema, the writer schema that was stored in the broker is required. The C++ client is not responsible to decode the message, but the extended library like the Python client needs the ability to get the writer schema from the broker. There is a workaround that we can send a REST request to query the schema, but it requires the admin permission.
Modification
Client::getSchemaInfoAsync
method to get the schema info asynchronously. When the schema of the given version does not exist, theSchemaInfo
whose type isNONE
will be returned.Message::getLongSchemaVersion
method to get the schema version of a message. The existinggetSchemaVersion
method is hard to use because it returns a byte array, which users need to know how to decode.fromBigEndianBytes
andtoBigEndianBytes
functions to perform conversion between a byte array and the long value of the schema version. AddInt64SerDes
to test them.LookupServiceTest
that initializesclient_
only once (even with theGetParam()
method) so the HTTP URL was never tested.testGetSchemaByVersion
to testgetSchemaInfoAsync
.Documentation
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)