-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat][pip] PIP-420: Provide ability for Pulsar clients to integrate with third-party schema registry service #24328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
87a2fed
add pip 413
gaoran10 32981d6
fix
gaoran10 c51f6c9
fix
gaoran10 94a3f08
fix
gaoran10 9087b06
change pip number
gaoran10 a8f1145
change pip num
gaoran10 9ea2b84
fix
gaoran10 4f54ffd
fix
gaoran10 121faba
fix
gaoran10 4452d56
fix
gaoran10 7148af2
update
gaoran10 eed3dc8
update
gaoran10 8a7e643
update vote mailing url
gaoran10 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,303 @@ | ||
| # PIP-420: Provides an ability for Pulsar clients to integrate with third-party schema registry service | ||
|
|
||
| # Motivation | ||
|
|
||
| Apache Pulsar currently provides a built-in schema management system tightly coupled with the broker. | ||
| Pulsar clients interact with this system implicitly when creating producers and consumers. | ||
|
|
||
| However, many organizations already have independent schema registry services (such as Confluent Schema Registry) | ||
| and wish to reuse their existing schema governance processes across multiple messaging systems, including Pulsar. | ||
|
|
||
| By enabling Pulsar clients to integrate with third-party schema registry services: | ||
| - Users can unify schema management across different platforms. | ||
| - Pulsar brokers can be decoupled from schema storage and validation responsibilities. | ||
| - Pulsar users can integrate with ecosystems that rely on external schema registries easier. | ||
|
|
||
| This flexibility is particularly valuable for enterprises with strict schema validation, versioning, | ||
| and governance workflows already centralized in external registries. | ||
|
|
||
| # Goals | ||
|
|
||
| ## In Scope | ||
|
|
||
| - Provide the ability for Pulsar clients to leverage third-party schema registry services for schema operations. | ||
|
|
||
| ## Out Scope | ||
|
|
||
| - Providing built-in implementations for third-party schemas. | ||
| - Support `AutoProduceBytesSchema` and `AutoConsumeSchema`. | ||
| - Migrating existing Pulsar-managed schemas to external schema registries. | ||
|
|
||
| # High Level Design | ||
|
|
||
| - Provide a mechanism to configure the Pulsar client to use either: | ||
| - The existing Pulsar schema registry (default) | ||
| - Third-party schema registry implementations | ||
|
|
||
| # Detailed Design | ||
|
|
||
| ## Design & Implementation Details | ||
|
|
||
| This PIP aims to enable the Pulsar client to directly integrate with external schema registry services for schema management. | ||
| In this model, the external schema registry is fully responsible for schema storage, retrieval, and validation. | ||
| The Pulsar broker will no longer manage schema data for topics using external schemas. | ||
|
|
||
| ### SchemaType: EXTERNAL | ||
|
|
||
| Pulsar will introduce a new schema type: **SchemaType.EXTERNAL**. | ||
|
|
||
| - All schemas that integrate with external schema registries must declare `SchemaType.EXTERNAL`. | ||
| - When using `EXTERNAL` schema type, the Pulsar client will provide empty schema data to the broker. | ||
| - The broker will only record the schema type for topics. | ||
| - Compatibility restrictions: | ||
| - Introduce a new compatibility check in broker side. | ||
| - The schema type `SchemaType.EXTERNAL` can't be compatible with other Pulsar schemas | ||
| - This prevents accidental data corruption or schema conflicts between internal and external schema management systems. | ||
| - Pulsar Geo replicator needs to transfer the external schema info to the remote clusters. | ||
|
|
||
| This design isolates external schema management and protects existing topics using Pulsar’s native schema system. | ||
|
|
||
| ### Extensibility via Client Interfaces | ||
|
|
||
| To integrate with external schema registries, users can: | ||
| - Implement the `Schema` interface to define custom schema encoding and decoding logic. | ||
|
|
||
| #### Key `Schema` Interface Methods: | ||
|
|
||
| Schema interface introduces methods for encoding and decoding messages, | ||
| allowing external schema implementations to handle serialization and deserialization. | ||
|
|
||
| - EncodeData encode(String topic, T message) **(New addition)** | ||
| - Serializes the message using the external schema. | ||
| - The encode method will be responsible for managing schema evolution and versioning. | ||
| - The method returns an `EncodeData` object that contains: | ||
| - The encoded byte array. | ||
| - The schema ID associated with the serialized data. | ||
| - Implementations should throw `SchemaSerializationException` if the serialization or deserialization fails. | ||
|
|
||
| - T decode(String topic, byte[] data, byte[] schemaId) **(New addition)** | ||
| - Deserialize the message using the external schema. | ||
| - The external schema can retrieve the schema by the schema ID. | ||
| - Users should handle exceptions when get value by themselves. | ||
|
|
||
| - closeAsync() **(New addition)** | ||
| - Called when the producer or consumer is closed. | ||
| - Allows external schema implementations to release resources. | ||
|
|
||
| #### Example Workflow: | ||
|
|
||
| - During producer or consumer initialization: | ||
| The external schema info will be registered to Pulsar schema storage. | ||
|
|
||
| - During producing or receiving messages: | ||
| The `encode` and `decode` methods handle the schema-aware serialization and deserialization using the external schema registry. | ||
|
|
||
| #### Schema ID & Schema Version | ||
|
|
||
| Unlike Pulsar, which uses **schema version** to identify schemas, many external schema registry systems use **schema ID** as the primary schema identifier. | ||
|
|
||
| When integrating with external schema registries: | ||
| - The `schemaVersion` will point to external schema info. | ||
| - The external schema will be responsible for managing schema evolution and versioning, this is different from Pulsar's native schema versioning. | ||
| - The schema encode method will return an `EncodeData` object that contains the encoded data and the schema ID. | ||
| - For store the external schema ID, this PIP introduces a new optional field `schema_id` in the `MessageMetadata`. | ||
| - The KeyValueSchema doesn't support using Pulsar's native schema and external schema at the same time. | ||
|
|
||
| The KeyValueSchemaID is also a byte array, the format is: keySchemaIdLength(4) + keySchemaId + valueSchemaIdLength(4) + valueSchemaId, | ||
| external schemas need to decode the key and value schema IDs from the KeyValueSchemaID. | ||
|
|
||
| This approach allows external schema systems to fully control schema evolution and versioning without being constrained by Pulsar’s native schema versioning mechanism. | ||
| This may impact some components that rely on schema version to deserialize messages, such as Pulsar Functions and Pulsar SQL; | ||
| they need to be updated to support setting schema properties, using the new schema type and handle external schemas appropriately. | ||
|
|
||
| #### Example usage | ||
|
|
||
| ```java | ||
| public void workWithExternalSchemaRegistry() throws Exception { | ||
| String topic = "testExternalJsonSchema"; | ||
|
|
||
| Map<String, String> configs = new HashedMap<>(); | ||
| configs.put("schema.registry.url", getSchemaRegistryUrl()); | ||
| configs.put("json.fail.unknown.properties", "false"); | ||
| KafkaSchemaFactory kafkaSchemaFactory = new KafkaSchemaFactory(configs); | ||
| Schema<User> schema = kafkaSchemaFactory.json(User.class); | ||
|
|
||
| @Cleanup | ||
| PulsarClient pulsarClient = getPulsarClient(); | ||
|
|
||
| @Cleanup | ||
| Producer<User> producer = pulsarClient.newProducer(schema) | ||
| .topic(topic) | ||
| .create(); | ||
|
|
||
| @Cleanup | ||
| Consumer<User> consumer = pulsarClient.newConsumer(schema) | ||
| .topic(topic) | ||
| .subscriptionName("sub") | ||
| .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) | ||
| .subscribe(); | ||
|
|
||
| for (int i = 0; i < 10; i++) { | ||
| producer.send(new User("name-" + i, 10 + i)); | ||
| } | ||
|
|
||
| for (int i = 0; i < 10; i++) { | ||
| Message<User> message = consumer.receive(); | ||
| consumer.acknowledge(message); | ||
| System.out.println("receive msg " + message.getValue().getClass().getName() + " " + message.getValue()); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ## Public-facing Changes | ||
BewareMyPower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ```protobuf | ||
| // File `SchemaRegistryFormat.proto` | ||
| message SchemaInfo { | ||
| enum SchemaType { | ||
| EXTERNAL = 22; | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ```protobuf | ||
| // File `PulsarApi.proto` | ||
| message Schema { | ||
| enum Type { | ||
| External = 22; | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| Add a new field `schema_id` to the `MessageMetadata` to store the schema ID for messages that use external schemas. | ||
| ```protobuf | ||
| // File `PulsarApi.proto` | ||
| message MessageMetadata { | ||
| optional bytes schema_id = 31; | ||
| } | ||
| ``` | ||
|
|
||
| Introduce a new SchemaType `EXTERNAL` to represent the schema types that work with external schema registry. | ||
| ```java | ||
| public enum SchemaType { | ||
|
|
||
| /** | ||
| * External Schema Type. | ||
| * <p> | ||
| * This is used to indicate that the schema is managed externally, such as in a schema registry. | ||
| * External schema type is not compatible with any other schema type. | ||
| * </p> | ||
| */ | ||
| EXTERNAL(21) | ||
|
|
||
| } | ||
| ``` | ||
|
|
||
| Add a new class `EncodeData` to encapsulate the encoded data and schema ID. | ||
| ```java | ||
| public class EncodeData { | ||
|
|
||
| private byte[] data; | ||
|
|
||
| private byte[] schemaId; | ||
|
|
||
| } | ||
| ``` | ||
|
|
||
| Add a new method `getSchemaId()` to the `Message` interface to retrieve the schema ID of the message. | ||
| This method will return the schema ID if the message is produced with an external schema, otherwise it will return null. | ||
| ```java | ||
| public interface Message<T> { | ||
|
|
||
| /** | ||
| * Get schema ID of the message. | ||
| * PIP-420 provides a way to produce messages with external schema, | ||
| * and the schema ID will be set to the message metadata. | ||
| * | ||
| * @return schema ID of the message if the message is produced with external schema. | ||
| */ | ||
| byte[] getSchemaId(); | ||
|
|
||
| } | ||
| ``` | ||
|
|
||
| Add two methods to encode and decode messages, which can be used by external schemas to serialize and deserialize messages. | ||
| The encode method returns an `EncodeData` object that contains the encoded byte array and schema ID. | ||
| The customized external schemas can set the `SchemaInfoProvider` and retrieve the configs from it, | ||
| extends the interface `AutoCloseable` to support close external schema resources. | ||
|
|
||
| ```java | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| public interface Schema extends Cloneable { | ||
|
|
||
| /** | ||
| * Encodes the message into a byte array using the schema. | ||
| * | ||
| * @param topic the topic for which the message is being encoded | ||
| * @param message the message to encode | ||
| * @return the encoded byte array and schema ID | ||
| * @throws SchemaSerializationException if the encoding fails | ||
| */ | ||
| default EncodeData encode(String topic, T message) { | ||
| return new EncodeData(encode(message), null); | ||
| } | ||
|
|
||
| default T decode(String topic, ByteBuffer data, byte[] schemaId) { | ||
| return decode(topic, getBytes(data), schemaId); | ||
| } | ||
|
|
||
| /** | ||
| * Decodes a byte array into a message using the schema. | ||
| * | ||
| * @param topic the topic for which the message is being decoded | ||
| * @param data the byte array to decode | ||
| * @param schemaId the schema ID associated with the data | ||
| * @return the decoded message | ||
| * @throws SchemaSerializationException if the decoding fails | ||
| */ | ||
| default T decode(String topic, byte[] data, byte[] schemaId) { | ||
| return decode(data, schemaId); | ||
| } | ||
|
|
||
| default void closeAsync() { | ||
| return CompletableFuture.completedFuture(null); | ||
| } | ||
|
|
||
| } | ||
| ``` | ||
|
|
||
| # Pulsar Function | ||
BewareMyPower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| For support using third-party schema registry service in Pulsar Function, | ||
| - Support the `SchemaType.EXTERNAL` schema type in Pulsar Function | ||
|
|
||
| # Pulsar-GEO replication impact | ||
poorbarcode marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| Integrating third-party schema registry services introduces a new approach to managing schemas for geo-replicated topics. | ||
|
|
||
| In the current Pulsar architecture: | ||
| - Schema definitions are stored and managed by the Pulsar brokers. | ||
| - During geo-replication, schema information must also be replicated across clusters to ensure schema consistency. | ||
|
|
||
| By using an external schema registry: | ||
| - **Schema management is fully decoupled from Pulsar brokers and replication mechanisms.** | ||
| - This eliminates the need for synchronizing schema data between Pulsar clusters, simplifying geo-replication processes. | ||
| - Supports a unified schema registry for cross-cluster producers and consumers | ||
|
|
||
| # Backward & Forward Compatibility | ||
|
|
||
| The new schema type `SchemaType.EXTERNAL` doesn't break any existing Pulsar topics, it's not compatible with other Pulsar schema types. | ||
|
|
||
| # Alternatives | ||
|
|
||
| Use `bytes` schema for "external" schemas, it can't provide any compatibility checks to protect topic data that use Pulsar's native schema system. | ||
|
|
||
| # General Notes | ||
|
|
||
| # Links | ||
|
|
||
| <!-- | ||
| Updated afterwards | ||
| --> | ||
| * Mailing List voting thread: https://lists.apache.org/thread/g7hypmql3gk2zog6cmmhg4h93hfw1o15 | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.