-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[schema] Introduce multi version generic record schema #3670
Conversation
*Motivation* Currently AUTO_CONSUME only supports decoding records from latest schema. All the schema versions are lost. It makes AUTO_CONSUME less useful in some use cases, such as CDC. Because there is no way for the applications to know which version of schema that a message is using. In order to support multi-version schema, we need to propagate schema version from message header through schema#decode method to the decoded record. *Modifications* - Introduce a new decode method `decode(byte[] data, byte[] schemaVersion)`. This allows the implementation to leverage the schema version. - Introduce a method `supportSchemaVersioning` to tell which decode methods to use. Because most of the schema implementations such as primitive schemas and POJO based schema doesn't make any sense to use schema version. - Introduce a SchemaProvider which returns a specific schema instance for a given schema version - Implement a MultiVersionGenericRecordSchema which decode the messages based on schema version. All the records decoded by this schema will have schema version and its corresponding schema definitions. *NOTES This implementation only introduce the mechanism. But it doesn't wire the multi-version schema with auto_consume schema. There will be a subsequent pull request on implementing a schema provider that fetches and caches schemas from brokers.
To clarify, this is only ever used for AUTO_CONSUME, no? When a client passes a schema like Schema.AVRO(MyPojo.class), it won't actually know the schema version. So the schema version is only used here for the client to request a specific version of the schema from the broker? |
@ivankelly that's correct. for pojo and primitive schemas, they will not know the schema version. for any other schema implementations that returns |
|
||
@Override | ||
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) { | ||
return provider.getSchema(schemaVersion).decode(bytes, schemaVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already get the specific schema, is it necessary to provide schemaVersion when call decode()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui good point. I think it is a good practice to pass the schema version down to the specific implementation and let the implementation decide whether to use this schema version or not.
*Motivation* Currently AUTO_CONSUME only supports decoding records from latest schema. All the schema versions are lost. It makes AUTO_CONSUME less useful in some use cases, such as CDC. Because there is no way for the applications to know which version of schema that a message is using. In order to support multi-version schema, we need to propagate schema version from message header through schema#decode method to the decoded record. *Modifications* - Introduce a new decode method `decode(byte[] data, byte[] schemaVersion)`. This allows the implementation to leverage the schema version. - Introduce a method `supportSchemaVersioning` to tell which decode methods to use. Because most of the schema implementations such as primitive schemas and POJO based schema doesn't make any sense to use schema version. - Introduce a SchemaProvider which returns a specific schema instance for a given schema version - Implement a MultiVersionGenericRecordSchema which decode the messages based on schema version. All the records decoded by this schema will have schema version and its corresponding schema definitions. *NOTES This implementation only introduce the mechanism. But it doesn't wire the multi-version schema with auto_consume schema. There will be a subsequent pull request on implementing a schema provider that fetches and caches schemas from brokers.
Motivation Pulsar 2.4.0 Added schema versioning to support multi version messages produce and consume apache#3876 apache#3670 apache#4211 apache#4325 apache#4548. but the doc is not updated accordingly. Modifications Update the schema version in the pulsar registry doc for releases 2.4.0/2.4.1/2.4.2.
Motivation
Currently AUTO_CONSUME only supports decoding records from latest schema.
All the schema versions are lost. It makes AUTO_CONSUME less useful in some use cases,
such as CDC. Because there is no way for the applications to know which version of schema
that a message is using.
In order to support multi-version schema, we need to propagate schema version from
message header through schema#decode method to the decoded record.
Modifications
decode(byte[] data, byte[] schemaVersion)
. This allows the implementationto leverage the schema version.
supportSchemaVersioning
to tell which decode methods to use. Because most of the schemaimplementations such as primitive schemas and POJO based schema doesn't make any sense to use schema version.
decoded by this schema will have schema version and its corresponding schema definitions.
NOTES
This implementation only introduce the mechanism. But it doesn't wire the multi-version schema
with auto_consume schema. There will be a subsequent pull request on implementing a schema provider
that fetches and caches schemas from brokers.