From 87a2fed62d6253e4c8baca2e1fc10360701c84ca Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Mon, 19 May 2025 10:33:57 +0800
Subject: [PATCH 01/13] add pip 413
---
pip/pip-413.md | 146 +++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 146 insertions(+)
create mode 100644 pip/pip-413.md
diff --git a/pip/pip-413.md b/pip/pip-413.md
new file mode 100644
index 0000000000000..c889548d745a2
--- /dev/null
+++ b/pip/pip-413.md
@@ -0,0 +1,146 @@
+# PIP-413: Support use external schema registry service for Pulsar client
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema manager into the Pulsar broker.
+This implementation has some weaknesses. Managing the schema increases the Pulsar broker and Pulsar protocol complexity;
+users can’t leverage other schema registry services in Pulsar client, such as Kafka Schema Registry.
+
+# Motivation
+
+The Pulsar client should have the ability to access external schema registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an independent service, if using external schema registry service,
+the Pulsar broker don't need to care about the schema of the messages.
+
+# Goals
+
+## In Scope
+
+- Support using external schema registry service for Pulsar client.
+
+## Out Scope
+
+This PIP will not include the implementation for accessing external schema system,
+and not include the Pulsar external schema registry service design.
+
+# High Level Design
+
+This PIP is just for providing some abilities.
+
+- Decouple schema management from creating producer or add consumer subscription commands.
+- Provide a way to build external schema management system clients and integrate with Pulsar clients.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP's target is making Pulsar client has the ability to leverage external schema registry service to manage schema.
+The external schema registry is responsible for managing the schema, the broker don't care about the messaging schema.
+The Pulsar client should ignore the schema information when creating producer and adding consumer subscription.
+
+Users can implement the `SchemaInfoProvider` interface and `Schema` interface to access external schema registry service.
+The `Schema` interface has mainly two methods `encode` and `decode`, the customized schemas can register schema or get schema with these methods.
+The encoded messaging depends on the external schema system, Pulsar broker just treats the message as bytes data, and it won't change the message version of message metadata.
+Unlike Pulsar using schema version to identify the schema, some external schema system use the schema ID to identify the schema,
+if using external schema system the Pulsar message metadata will not maintain schema ID, the customized decoding method can try to retrieve the schema ID from the encoded data.
+
+## Public-facing Changes
+
+Add new methods for `SchemaInfoProvider` interface.
+The `SchemaInfoProvider` provide necessary params for connecting to the external schema registry service with the method `getConfigs`.
+If the schema info provider is external, the new producer command, consumer subscribe command will treat the schema as bytes schema, the broker will ignore schema validation.
+```java
+public interface SchemaInfoProvider {
+
+ /**
+ * Returns the configs of the schema registry service, such as URL, authentication params.
+ */
+ default Map getConfigs() {
+ return Collections.emptyMap();
+ }
+
+ /**
+ * It's used to determine whether the SchemaInfoProvider is external or not.
+ */
+ default boolean isExternal() {
+ return false;
+ }
+
+}
+```
+
+Add a new interface `SchemaInfoProviderFactory`, it's used to initialize `SchemaInfoProvider`, each topic has its own `SchemaInfoProvider`.
+```java
+public interface SchemaInfoProviderFactory {
+
+ SchemaInfoProvider of(String topic);
+
+}
+```
+
+The client build supports setting the `SchemaInfoProviderFactory`.
+```java
+public interface ClientBuilder extends Serializable, Cloneable {
+
+ ClientBuilder schemaInfoProviderFactory(SchemaInfoProviderFactory schemaInfoProviderFactory);
+
+}
+```
+
+The `ClientConfigurationData` supports transfer `SchemaInfoProviderFactory`.
+```java
+public class ClientConfigurationData implements Serializable, Cloneable {
+
+ @JsonIgnore
+ private transient SchemaInfoProviderFactory schemaInfoProviderFactory;
+
+}
+```
+
+The customized schema can get the `SchemaInfoProvider` and retrieve the configs from it.
+```java
+public interface Schema {
+
+ /**
+ * When setting schema info provider for schema, the schema can retrieve the configs.
+ */
+ default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+ }
+
+ /**
+ * Returns the schema info provider.
+ *
+ * @return a {@code SchemaInfoProvider} representing the schema info provider
+ */
+ default SchemaInfoProvider getSchemaInfoProvider() {
+ return null;
+ }
+
+}
+```
+
+### CLI
+
+# Backward & Forward Compatibility
+
+## Revert
+
+No changes are needed to revert to the previous version.
+
+## Upgrade
+
+No other changes are needed to upgrade to the new version.
+
+# Alternatives
+
+None
+
+# General Notes
+
+# Links
+
+
+* Mailing List discussion thread: https://lists.apache.org/thread/olx4xm8cdy43omp5c0jm44sj1gp0grcr
+* Mailing List voting thread: https://lists.apache.org/thread/vhq6ox4nh2rx59yoxowftqzv8f9lnm4q
From 32981d6743d76bb1c17e96e08de7d1ac80f19876 Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Mon, 19 May 2025 21:55:41 +0800
Subject: [PATCH 02/13] fix
---
pip/pip-413.md | 127 ++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 115 insertions(+), 12 deletions(-)
diff --git a/pip/pip-413.md b/pip/pip-413.md
index c889548d745a2..e1efb0c9fb9b7 100644
--- a/pip/pip-413.md
+++ b/pip/pip-413.md
@@ -1,27 +1,26 @@
-# PIP-413: Support use external schema registry service for Pulsar client
+# PIP-413: Decouple schema management from messaging protocols for Pulsar clients
# Background knowledge
Schema is an important feature for messaging systems. Pulsar integrates schema manager into the Pulsar broker.
-This implementation has some weaknesses. Managing the schema increases the Pulsar broker and Pulsar protocol complexity;
-users can’t leverage other schema registry services in Pulsar client, such as Kafka Schema Registry.
+The current implementation in Pulsar clients couples schema management with some protocols. This increases the Pulsar protocol complexity;
+and users can’t leverage other schema registry services in Pulsar client, such as Kafka Schema Registry.
# Motivation
-The Pulsar client should have the ability to access external schema registry service to manage the schema (register schema,
+The Pulsar client is better has the ability to access external schema registry service to manage the schema (register schema,
get schema, validate schema, etc.). The schema registry service can be an independent service, if using external schema registry service,
-the Pulsar broker don't need to care about the schema of the messages.
+the Pulsar broker doesn't need to care about the schema of the messages.
# Goals
## In Scope
-- Support using external schema registry service for Pulsar client.
+- Provide an ability to leverage external schema registry service for Pulsar client.
## Out Scope
-This PIP will not include the implementation for accessing external schema system,
-and not include the Pulsar external schema registry service design.
+This PIP will not include the implementation for accessing external schema system.
# High Level Design
@@ -34,8 +33,9 @@ This PIP is just for providing some abilities.
## Design & Implementation Details
-This PIP's target is making Pulsar client has the ability to leverage external schema registry service to manage schema.
-The external schema registry is responsible for managing the schema, the broker don't care about the messaging schema.
+This PIP's target is decoupling schema management from Pulsar messaging protocols,
+and making Pulsar client have the ability to leverage external schema registry service to manage schema.
+The external schema registry is responsible for managing the schema, the broker doesn't care about the messaging schema.
The Pulsar client should ignore the schema information when creating producer and adding consumer subscription.
Users can implement the `SchemaInfoProvider` interface and `Schema` interface to access external schema registry service.
@@ -44,10 +44,108 @@ The encoded messaging depends on the external schema system, Pulsar broker just
Unlike Pulsar using schema version to identify the schema, some external schema system use the schema ID to identify the schema,
if using external schema system the Pulsar message metadata will not maintain schema ID, the customized decoding method can try to retrieve the schema ID from the encoded data.
+Example usage
+```java
+public void workWithExternalSchemaRegistry() throws Exception {
+ Map srConfig = new HashedMap<>();
+ srConfig.put("schema.registry.url", "http://localhost:8001");
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .schemaInfoProviderFactory(new KafkaSchemaInfoProviderFactory(srConfig))
+ .build();
+
+ String topic = "t1";
+
+ Schema schema = KafkaSchemas.JSON(User.class);
+
+ Producer producer = client.newProducer(schema)
+ .topic(topic)
+ .create();
+
+ Consumer consumer = client.newConsumer(schema)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(new User("name-" + i, 10 + i));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Message message = consumer.receive();
+ consumer.acknowledge(message);
+ }
+
+ client.close();
+}
+```
+
+Messaging protocols changes
+
+Ignore schema info while creating Producer, ignore register schema before sending messages, the schema management
+```java
+public class ProducerImpl extends ProducerBase implements TimerTask, ConnectionHandler.Connection {
+
+ @Override
+ public CompletableFuture connectionOpened(final ClientCnx cnx) {
+ // ...
+
+ if (schema != null && schema.getSchemaInfoProvider() != null && schema.getSchemaInfoProvider().isExternal()) {
+ try {
+ // set schemaInfo to null if the schema info provider is external
+ schemaInfo = null;
+ } catch (Exception e) {
+ log.error("Failed to register schema.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ // send create producer request
+ }
+
+ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback, long expectedCnxEpoch) {
+ // ...
+
+ if (schema.getSchemaInfoProvider().isExternal()) {
+ // don't register schema if the schema is external, the register method can be integrated in the messaging encode method.
+ return;
+ }
+ // getOrCreateSchemaAsync
+ }
+
+}
+```
+
+Ignore schema info while adding consumer subscription.
+```java
+public class ConsumerImpl extends ConsumerBase implements ConnectionHandler.Connection {
+
+ @Override
+ public CompletableFuture connectionOpened(final ClientCnx cnx) {
+ // ...
+
+ // only external schema info provider need to register schema
+ if (schema.getSchemaInfoProvider() != null && schema.getSchemaInfoProvider().isExternal()) {
+ try {
+ // don't set schema info for schema registry schema
+ si = null;
+ } catch (Exception e) {
+ log.error("Failed to register schema.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+}
+```
+
## Public-facing Changes
Add new methods for `SchemaInfoProvider` interface.
-The `SchemaInfoProvider` provide necessary params for connecting to the external schema registry service with the method `getConfigs`.
+The `SchemaInfoProvider` provides necessary params for connecting to the external schema registry service with the method `getConfigs`.
If the schema info provider is external, the new producer command, consumer subscribe command will treat the schema as bytes schema, the broker will ignore schema validation.
```java
public interface SchemaInfoProvider {
@@ -119,7 +217,12 @@ public interface Schema {
}
```
-### CLI
+# Security Considerations
+Users can provide security configuration while generating the `SchemaInfoProviderFactory` instance,
+the factory can transfer the security configuration to the `SchemaInfoProvider` instance.
+
+# Pulsar-GEO replication
+If users can use external schema registry service, it provides a new way to manage scheme for geo-replicated topics.
# Backward & Forward Compatibility
From c51f6c95df59e108640a825d864ff735cdefe540 Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Wed, 21 May 2025 19:03:33 +0800
Subject: [PATCH 03/13] fix
---
pip/pip-413.md | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/pip/pip-413.md b/pip/pip-413.md
index e1efb0c9fb9b7..80a2a42067a5d 100644
--- a/pip/pip-413.md
+++ b/pip/pip-413.md
@@ -217,6 +217,11 @@ public interface Schema {
}
```
+# Pulsar Function
+
+For supporting using external schema registry service in Pulsar Function,
+the function configurations should support setting the `SchemaInfoProviderFactory` and schema registry auth configurations.
+
# Security Considerations
Users can provide security configuration while generating the `SchemaInfoProviderFactory` instance,
the factory can transfer the security configuration to the `SchemaInfoProvider` instance.
From 94a3f08f6098886935a2e4c6cc9a382a35ba6077 Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Thu, 22 May 2025 00:11:52 +0800
Subject: [PATCH 04/13] fix
---
pip/pip-413.md | 23 +++++++++++------------
1 file changed, 11 insertions(+), 12 deletions(-)
diff --git a/pip/pip-413.md b/pip/pip-413.md
index 80a2a42067a5d..89cc423481473 100644
--- a/pip/pip-413.md
+++ b/pip/pip-413.md
@@ -1,26 +1,26 @@
-# PIP-413: Decouple schema management from messaging protocols for Pulsar clients
+# PIP-413: Provide ability for Pulsar clients to integrate with third-party schema registry service
# Background knowledge
Schema is an important feature for messaging systems. Pulsar integrates schema manager into the Pulsar broker.
-The current implementation in Pulsar clients couples schema management with some protocols. This increases the Pulsar protocol complexity;
-and users can’t leverage other schema registry services in Pulsar client, such as Kafka Schema Registry.
+The current implementation in Pulsar clients couples schema management with some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage third-party schema registry services in Pulsar client.
# Motivation
-The Pulsar client is better has the ability to access external schema registry service to manage the schema (register schema,
-get schema, validate schema, etc.). The schema registry service can be an independent service, if using external schema registry service,
+The Pulsar client is better has the ability to access third-party schema registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an independent service, if using third-party schema registry service,
the Pulsar broker doesn't need to care about the schema of the messages.
# Goals
## In Scope
-- Provide an ability to leverage external schema registry service for Pulsar client.
+- Provide an ability to leverage third-party schema registry service for Pulsar client.
## Out Scope
-This PIP will not include the implementation for accessing external schema system.
+This PIP will not include the implementation for accessing third-party schema system.
# High Level Design
@@ -56,7 +56,6 @@ public void workWithExternalSchemaRegistry() throws Exception {
.build();
String topic = "t1";
-
Schema schema = KafkaSchemas.JSON(User.class);
Producer producer = client.newProducer(schema)
@@ -109,7 +108,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne
// ...
if (schema.getSchemaInfoProvider().isExternal()) {
- // don't register schema if the schema is external, the register method can be integrated in the messaging encode method.
+ // don't register schema if external schema registry service, the register method can be integrated in the messaging encode method.
return;
}
// getOrCreateSchemaAsync
@@ -219,15 +218,15 @@ public interface Schema {
# Pulsar Function
-For supporting using external schema registry service in Pulsar Function,
-the function configurations should support setting the `SchemaInfoProviderFactory` and schema registry auth configurations.
+For supporting using third-party schema registry service in Pulsar Function,
+the function configurations should support setting the `SchemaInfoProviderFactory` and schema registry auth configurations while initializing the Pulsar client.
# Security Considerations
Users can provide security configuration while generating the `SchemaInfoProviderFactory` instance,
the factory can transfer the security configuration to the `SchemaInfoProvider` instance.
# Pulsar-GEO replication
-If users can use external schema registry service, it provides a new way to manage scheme for geo-replicated topics.
+If users can use third-party schema registry service, it provides a new way to manage scheme for geo-replicated topics.
# Backward & Forward Compatibility
From 9087b06f4c3f7cd62dee63fac6206afbaca83de6 Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Thu, 22 May 2025 00:12:50 +0800
Subject: [PATCH 05/13] change pip number
---
pip/{pip-413.md => pip-417.md} | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
rename pip/{pip-413.md => pip-417.md} (99%)
diff --git a/pip/pip-413.md b/pip/pip-417.md
similarity index 99%
rename from pip/pip-413.md
rename to pip/pip-417.md
index 89cc423481473..d0c885a342f8e 100644
--- a/pip/pip-413.md
+++ b/pip/pip-417.md
@@ -1,4 +1,4 @@
-# PIP-413: Provide ability for Pulsar clients to integrate with third-party schema registry service
+# PIP-417: Provide ability for Pulsar clients to integrate with third-party schema registry service
# Background knowledge
From a8f11457f151656624f2f47fdb9cad29216d3e77 Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Thu, 22 May 2025 00:18:29 +0800
Subject: [PATCH 06/13] change pip num
---
pip/{pip-417.md => pip-420.md} | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
rename pip/{pip-417.md => pip-420.md} (99%)
diff --git a/pip/pip-417.md b/pip/pip-420.md
similarity index 99%
rename from pip/pip-417.md
rename to pip/pip-420.md
index d0c885a342f8e..3c1d64497652e 100644
--- a/pip/pip-417.md
+++ b/pip/pip-420.md
@@ -1,4 +1,4 @@
-# PIP-417: Provide ability for Pulsar clients to integrate with third-party schema registry service
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party schema registry service
# Background knowledge
From 9ea2b849201fc48903ff64fa46c8d4e5c8ffaeb8 Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Wed, 18 Jun 2025 10:27:41 +0800
Subject: [PATCH 07/13] fix
---
pip/pip-420.md | 189 +++++++++++++++++++++++++++----------------------
1 file changed, 103 insertions(+), 86 deletions(-)
diff --git a/pip/pip-420.md b/pip/pip-420.md
index 3c1d64497652e..662acab07f40c 100644
--- a/pip/pip-420.md
+++ b/pip/pip-420.md
@@ -1,50 +1,102 @@
-# PIP-420: Provide ability for Pulsar clients to integrate with third-party schema registry service
+# PIP-420: Provides an ability for Pulsar clients to integrate with third-party schema registry service
-# Background knowledge
+# Motivation
-Schema is an important feature for messaging systems. Pulsar integrates schema manager into the Pulsar broker.
-The current implementation in Pulsar clients couples schema management with some protocols (creating producer, adding consumer subscription).
-This increases the Pulsar protocol complexity and users can’t leverage third-party schema registry services in Pulsar client.
+Apache Pulsar currently provides a built-in schema management system tightly coupled with the broker.
+Pulsar clients interact with this system implicitly when creating producers and consumers.
-# Motivation
+However, many organizations already have independent schema registry services (such as Confluent Schema Registry)
+and wish to reuse their existing schema governance processes across multiple messaging systems, including Pulsar.
+
+By enabling Pulsar clients to integrate with third-party schema registry services:
+- Users can unify schema management across different platforms.
+- Pulsar brokers can be decoupled from schema storage and validation responsibilities.
-The Pulsar client is better has the ability to access third-party schema registry service to manage the schema (register schema,
-get schema, validate schema, etc.). The schema registry service can be an independent service, if using third-party schema registry service,
-the Pulsar broker doesn't need to care about the schema of the messages.
+This flexibility is particularly valuable for enterprises with strict schema validation, versioning,
+and governance workflows already centralized in external registries.
# Goals
## In Scope
-- Provide an ability to leverage third-party schema registry service for Pulsar client.
+- Provide the ability for Pulsar clients to leverage third-party schema registry services for schema operations.
## Out Scope
-This PIP will not include the implementation for accessing third-party schema system.
+- Providing built-in implementations for third-party schemas.
+- Migrating existing Pulsar-managed schemas to external schema registries.
# High Level Design
-This PIP is just for providing some abilities.
-
-- Decouple schema management from creating producer or add consumer subscription commands.
-- Provide a way to build external schema management system clients and integrate with Pulsar clients.
+- Provide a mechanism to configure the Pulsar client to use either:
+ - The existing Pulsar schema registry (default)
+ - A third-party schema registry implementation
# Detailed Design
## Design & Implementation Details
-This PIP's target is decoupling schema management from Pulsar messaging protocols,
-and making Pulsar client have the ability to leverage external schema registry service to manage schema.
-The external schema registry is responsible for managing the schema, the broker doesn't care about the messaging schema.
-The Pulsar client should ignore the schema information when creating producer and adding consumer subscription.
+This PIP aims to enable the Pulsar client to directly integrate with external schema registry services for schema management.
+In this model, the external schema registry is fully responsible for schema storage, retrieval, and validation.
+The Pulsar broker will no longer manage schema data for topics using external schemas.
+
+### SchemaType: EXTERNAL
+
+Pulsar will introduce a new schema type: **SchemaType.EXTERNAL**.
+
+- All schemas that integrate with external schema registries must declare `SchemaType.EXTERNAL`.
+- When using `EXTERNAL` schema type, the Pulsar client will provide an empty schema definition to the broker.
+- If the `SchemaInfoProvider` is external but the schema type is not `EXTERNAL`, throw an ExternalSchemaException.
+- The broker will only record the schema type for topics.
+- Compatibility restrictions:
+ - Introduce a new compatibility check in broker side.
+ - The schema type `SchemaType.EXTERNAL` can't be compatible with other Pulsar schemas
+ - This prevents accidental data corruption or schema conflicts between internal and external schema management systems.
+
+This design isolates external schema management and protects existing topics using Pulsar’s native schema system.
+
+### Extensibility via Client Interfaces
+
+To integrate with external schema registries, users can:
+- Implement the `Schema` interface to define custom schema encoding and decoding logic.
+- Implement the `SchemaInfoProvider` interface to initialize "external" schemas.
+
+#### Key `Schema` Interface Methods:
+- byte[] encode(T message)
+ Serializes the message using the external schema.
-Users can implement the `SchemaInfoProvider` interface and `Schema` interface to access external schema registry service.
-The `Schema` interface has mainly two methods `encode` and `decode`, the customized schemas can register schema or get schema with these methods.
-The encoded messaging depends on the external schema system, Pulsar broker just treats the message as bytes data, and it won't change the message version of message metadata.
-Unlike Pulsar using schema version to identify the schema, some external schema system use the schema ID to identify the schema,
-if using external schema system the Pulsar message metadata will not maintain schema ID, the customized decoding method can try to retrieve the schema ID from the encoded data.
+- T decode(byte[] bytes)
+ Deserialize the message using the external schema.
+
+- close()
+ **(New addition)**
+ Called when the producer or consumer is closed.
+ Allows external schema implementations to release resources, such as schema registry connections or caches.
+
+#### Example Workflow:
+
+- During producer or consumer initialization:
+ The custom schema can register schemas or fetch schema metadata from the external registry.
+
+- During message send or receive:
+ The `encode` and `decode` methods handle the schema-aware serialization and deserialization using the external schema registry.
+
+#### Schema ID & Schema Version
+
+Unlike Pulsar, which uses **schema version** to identify schemas, many external schema registry systems use **schema ID** as the primary schema identifier.
+
+When integrating with external schema registries:
+- The `schemaVersion` **field in Pulsar message metadata becomes unnecessary**.
+- The Pulsar client **does not need to store or maintain schema Versions in the message metadata**.
+- Instead, the external schema implementation can manage schema ID handling internally.
+- If needed, the schema ID can be embedded directly into the message payload by the external schema’s `encode` and `decode` methods.
+
+This approach allows external schema systems to fully control schema evolution and versioning without being constrained by Pulsar’s native schema versioning mechanism.
+This may impact some components that rely on schema version to deserialize messages, such as Pulsar Functions and Pulsar SQL,
+they will need to be updated to support the new schema type and handle external schemas appropriately.
+
+#### Example usage
-Example usage
```java
public void workWithExternalSchemaRegistry() throws Exception {
Map srConfig = new HashedMap<>();
@@ -81,68 +133,24 @@ public void workWithExternalSchemaRegistry() throws Exception {
}
```
-Messaging protocols changes
-
-Ignore schema info while creating Producer, ignore register schema before sending messages, the schema management
-```java
-public class ProducerImpl extends ProducerBase implements TimerTask, ConnectionHandler.Connection {
-
- @Override
- public CompletableFuture connectionOpened(final ClientCnx cnx) {
- // ...
-
- if (schema != null && schema.getSchemaInfoProvider() != null && schema.getSchemaInfoProvider().isExternal()) {
- try {
- // set schemaInfo to null if the schema info provider is external
- schemaInfo = null;
- } catch (Exception e) {
- log.error("Failed to register schema.", e);
- throw new RuntimeException(e);
- }
- }
-
- // send create producer request
- }
-
- private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback, long expectedCnxEpoch) {
- // ...
-
- if (schema.getSchemaInfoProvider().isExternal()) {
- // don't register schema if external schema registry service, the register method can be integrated in the messaging encode method.
- return;
- }
- // getOrCreateSchemaAsync
- }
-
-}
-```
+## Public-facing Changes
-Ignore schema info while adding consumer subscription.
+Introduce a new SchemaType `EXTERNAL` to represent the schema types that work with external schema registry.
```java
-public class ConsumerImpl extends ConsumerBase implements ConnectionHandler.Connection {
-
- @Override
- public CompletableFuture connectionOpened(final ClientCnx cnx) {
- // ...
-
- // only external schema info provider need to register schema
- if (schema.getSchemaInfoProvider() != null && schema.getSchemaInfoProvider().isExternal()) {
- try {
- // don't set schema info for schema registry schema
- si = null;
- } catch (Exception e) {
- log.error("Failed to register schema.", e);
- throw new RuntimeException(e);
- }
- }
+public enum SchemaType {
- }
+ /**
+ * External Schema Type.
+ *
+ * This is used to indicate that the schema is managed externally, such as in a schema registry.
+ * External schema type is not compatible with any other schema type.
+ *
+ */
+ EXTERNAL(-5)
}
```
-## Public-facing Changes
-
Add new methods for `SchemaInfoProvider` interface.
The `SchemaInfoProvider` provides necessary params for connecting to the external schema registry service with the method `getConfigs`.
If the schema info provider is external, the new producer command, consumer subscribe command will treat the schema as bytes schema, the broker will ignore schema validation.
@@ -194,9 +202,14 @@ public class ClientConfigurationData implements Serializable, Cloneable {
}
```
-The customized schema can get the `SchemaInfoProvider` and retrieve the configs from it.
+The customized schema can get the `SchemaInfoProvider` and retrieve the configs from it, extends the interface `AutoCloseable` to support close some resources.
```java
-public interface Schema {
+public interface Schema extends Cloneable, AutoCloseable {
+
+ @Override
+ default void close() {
+ // no-op
+ }
/**
* When setting schema info provider for schema, the schema can retrieve the configs.
@@ -218,15 +231,19 @@ public interface Schema {
# Pulsar Function
-For supporting using third-party schema registry service in Pulsar Function,
-the function configurations should support setting the `SchemaInfoProviderFactory` and schema registry auth configurations while initializing the Pulsar client.
+For support using third-party schema registry service in Pulsar Function,
+- Support setting the `SchemaInfoProviderFactory` while initializing the Pulsar client
+- Support the schema registry auth configurations while initializing the Pulsar client
+- Support the `SchemaType.EXTERNAL` schema type in Pulsar Function
# Security Considerations
+
Users can provide security configuration while generating the `SchemaInfoProviderFactory` instance,
the factory can transfer the security configuration to the `SchemaInfoProvider` instance.
# Pulsar-GEO replication
-If users can use third-party schema registry service, it provides a new way to manage scheme for geo-replicated topics.
+
+If users can use third-party schema registry service, it provides a new way to manage schemes for geo-replicated topics.
# Backward & Forward Compatibility
@@ -240,7 +257,7 @@ No other changes are needed to upgrade to the new version.
# Alternatives
-None
+Use `bytes` schema for "external" schemas, it does not provide any compatibility checks to protect topic data that use Pulsar's native schema system.
# General Notes
From 4f54ffd419ac60c9afaaaf2e35e2c4f5597a639b Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Wed, 18 Jun 2025 11:26:17 +0800
Subject: [PATCH 08/13] fix
---
pip/pip-420.md | 32 +++++++++++++++++++++++---------
1 file changed, 23 insertions(+), 9 deletions(-)
diff --git a/pip/pip-420.md b/pip/pip-420.md
index 662acab07f40c..3c3496d034240 100644
--- a/pip/pip-420.md
+++ b/pip/pip-420.md
@@ -11,6 +11,7 @@ and wish to reuse their existing schema governance processes across multiple mes
By enabling Pulsar clients to integrate with third-party schema registry services:
- Users can unify schema management across different platforms.
- Pulsar brokers can be decoupled from schema storage and validation responsibilities.
+- Pulsar users can integrate with ecosystems that rely on external schema registries easier.
This flexibility is particularly valuable for enterprises with strict schema validation, versioning,
and governance workflows already centralized in external registries.
@@ -52,6 +53,7 @@ Pulsar will introduce a new schema type: **SchemaType.EXTERNAL**.
- Introduce a new compatibility check in broker side.
- The schema type `SchemaType.EXTERNAL` can't be compatible with other Pulsar schemas
- This prevents accidental data corruption or schema conflicts between internal and external schema management systems.
+- Pulsar Geo replicator needs to transfer the schema type `SchemaType.EXTERNAL` to the remote cluster.
This design isolates external schema management and protects existing topics using Pulsar’s native schema system.
@@ -63,15 +65,18 @@ To integrate with external schema registries, users can:
#### Key `Schema` Interface Methods:
- byte[] encode(T message)
- Serializes the message using the external schema.
+ - Serializes the message using the external schema.
+ - Implementations should throw `SchemaSerializationException` if the serialization fails.
- T decode(byte[] bytes)
- Deserialize the message using the external schema.
+ - Deserialize the message using the external schema.
+ - Users should handle exceptions when get value by themselves.
- close()
+
**(New addition)**
- Called when the producer or consumer is closed.
- Allows external schema implementations to release resources, such as schema registry connections or caches.
+ - Called when the producer or consumer is closed.
+ - Allows external schema implementations to release resources, such as schema registry connections or caches.
#### Example Workflow:
@@ -86,10 +91,10 @@ To integrate with external schema registries, users can:
Unlike Pulsar, which uses **schema version** to identify schemas, many external schema registry systems use **schema ID** as the primary schema identifier.
When integrating with external schema registries:
-- The `schemaVersion` **field in Pulsar message metadata becomes unnecessary**.
-- The Pulsar client **does not need to store or maintain schema Versions in the message metadata**.
+- The `schemaVersion` filed in Pulsar message metadata is used in some places, **set to `-1` to flag the message is using external schema systems**.
+- The Pulsar client **needs to set the schema version to -1 and store in the message metadata**.
- Instead, the external schema implementation can manage schema ID handling internally.
-- If needed, the schema ID can be embedded directly into the message payload by the external schema’s `encode` and `decode` methods.
+- The schema ID can be embedded directly into the message payload by the external schema’s `encode` and `decode` methods.
This approach allows external schema systems to fully control schema evolution and versioning without being constrained by Pulsar’s native schema versioning mechanism.
This may impact some components that rely on schema version to deserialize messages, such as Pulsar Functions and Pulsar SQL,
@@ -241,9 +246,18 @@ For support using third-party schema registry service in Pulsar Function,
Users can provide security configuration while generating the `SchemaInfoProviderFactory` instance,
the factory can transfer the security configuration to the `SchemaInfoProvider` instance.
-# Pulsar-GEO replication
+# Pulsar-GEO replication impact
+
+Integrating third-party schema registry services introduces a new approach to managing schemas for geo-replicated topics.
+
+In the current Pulsar architecture:
+- Schema definitions are stored and managed by the Pulsar brokers.
+- During geo-replication, schema information must also be replicated across clusters to ensure schema consistency.
-If users can use third-party schema registry service, it provides a new way to manage schemes for geo-replicated topics.
+By using an external schema registry:
+- **Schema management is fully decoupled from Pulsar brokers and replication mechanisms.**
+- This eliminates the need for synchronizing schema data between Pulsar clusters, simplifying geo-replication processes.
+- Supports a unified schema registry for cross-cluster producers and consumers
# Backward & Forward Compatibility
From 121faba4c307e2cd7219bb05a3a61e2e24c6e5de Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Mon, 7 Jul 2025 00:42:06 +0800
Subject: [PATCH 09/13] fix
---
pip/pip-420.md | 137 +++++++++++++++++++++++--------------------------
1 file changed, 65 insertions(+), 72 deletions(-)
diff --git a/pip/pip-420.md b/pip/pip-420.md
index 3c3496d034240..01e48f06debb7 100644
--- a/pip/pip-420.md
+++ b/pip/pip-420.md
@@ -25,13 +25,14 @@ and governance workflows already centralized in external registries.
## Out Scope
- Providing built-in implementations for third-party schemas.
+- Support `AutoProduceBytesSchema` and `AutoConsumeSchema`.
- Migrating existing Pulsar-managed schemas to external schema registries.
# High Level Design
- Provide a mechanism to configure the Pulsar client to use either:
- The existing Pulsar schema registry (default)
- - A third-party schema registry implementation
+ - Third-party schema registry implementations
# Detailed Design
@@ -46,8 +47,7 @@ The Pulsar broker will no longer manage schema data for topics using external sc
Pulsar will introduce a new schema type: **SchemaType.EXTERNAL**.
- All schemas that integrate with external schema registries must declare `SchemaType.EXTERNAL`.
-- When using `EXTERNAL` schema type, the Pulsar client will provide an empty schema definition to the broker.
-- If the `SchemaInfoProvider` is external but the schema type is not `EXTERNAL`, throw an ExternalSchemaException.
+- When using `EXTERNAL` schema type, the Pulsar client will provide empty schema data to the broker.
- The broker will only record the schema type for topics.
- Compatibility restrictions:
- Introduce a new compatibility check in broker side.
@@ -61,7 +61,6 @@ This design isolates external schema management and protects existing topics usi
To integrate with external schema registries, users can:
- Implement the `Schema` interface to define custom schema encoding and decoding logic.
-- Implement the `SchemaInfoProvider` interface to initialize "external" schemas.
#### Key `Schema` Interface Methods:
- byte[] encode(T message)
@@ -72,16 +71,18 @@ To integrate with external schema registries, users can:
- Deserialize the message using the external schema.
- Users should handle exceptions when get value by themselves.
-- close()
+- void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider)
+ - Call this method when creating schema
+ - External schema can be initialized when calling this method
- **(New addition)**
+- close() **(New addition)**
- Called when the producer or consumer is closed.
- Allows external schema implementations to release resources, such as schema registry connections or caches.
#### Example Workflow:
- During producer or consumer initialization:
- The custom schema can register schemas or fetch schema metadata from the external registry.
+ The external schema info will be registered to Pulsar schema storage.
- During message send or receive:
The `encode` and `decode` methods handle the schema-aware serialization and deserialization using the external schema registry.
@@ -107,34 +108,37 @@ public void workWithExternalSchemaRegistry() throws Exception {
Map srConfig = new HashedMap<>();
srConfig.put("schema.registry.url", "http://localhost:8001");
- PulsarClient client = PulsarClient.builder()
- .serviceUrl("pulsar://localhost:6650")
- .schemaInfoProviderFactory(new KafkaSchemaInfoProviderFactory(srConfig))
- .build();
-
- String topic = "t1";
+ String topic = "testExternalJsonSchema";
+
Schema schema = KafkaSchemas.JSON(User.class);
-
- Producer producer = client.newProducer(schema)
+
+ @Cleanup
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:" + getBrokerServicePort())
+ .schemaProperties(srConfig)
+ .build();
+
+ @Cleanup
+ Producer producer = pulsarClient.newProducer(schema)
.topic(topic)
.create();
-
- Consumer consumer = client.newConsumer(schema)
+
+ @Cleanup
+ Consumer consumer = pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
-
+
for (int i = 0; i < 10; i++) {
- producer.send(new User("name-" + i, 10 + i));
+ producer.send(new User("name-" + i, 10 + i));
}
-
+
for (int i = 0; i < 10; i++) {
- Message message = consumer.receive();
- consumer.acknowledge(message);
+ Message message = consumer.receive();
+ consumer.acknowledge(message);
+ assertEquals(message.getValue().getName(), "name-" + i);
}
-
- client.close();
}
```
@@ -151,14 +155,30 @@ public enum SchemaType {
* External schema type is not compatible with any other schema type.
*
*/
- EXTERNAL(-5)
+ EXTERNAL(21)
}
```
-Add new methods for `SchemaInfoProvider` interface.
-The `SchemaInfoProvider` provides necessary params for connecting to the external schema registry service with the method `getConfigs`.
-If the schema info provider is external, the new producer command, consumer subscribe command will treat the schema as bytes schema, the broker will ignore schema validation.
+```protobuf
+// File `SchemaRegistryFormat.proto`
+message SchemaInfo {
+ enum SchemaType {
+ EXTERNAL = 22;
+ }
+}
+```
+
+```protobuf
+// File `PulsarApi.proto`
+message Schema {
+ enum Type {
+ External = 22;
+ }
+}
+```
+
+Add a new method `getConfigs` for `SchemaInfoProvider` interface to provide necessary params for external schemas connect to the schema registry service.
```java
public interface SchemaInfoProvider {
@@ -169,45 +189,35 @@ public interface SchemaInfoProvider {
return Collections.emptyMap();
}
- /**
- * It's used to determine whether the SchemaInfoProvider is external or not.
- */
- default boolean isExternal() {
- return false;
- }
-
-}
-```
-
-Add a new interface `SchemaInfoProviderFactory`, it's used to initialize `SchemaInfoProvider`, each topic has its own `SchemaInfoProvider`.
-```java
-public interface SchemaInfoProviderFactory {
-
- SchemaInfoProvider of(String topic);
-
}
```
-The client build supports setting the `SchemaInfoProviderFactory`.
+The client build supports setting the `schemaProperties`.
```java
public interface ClientBuilder extends Serializable, Cloneable {
- ClientBuilder schemaInfoProviderFactory(SchemaInfoProviderFactory schemaInfoProviderFactory);
+ /**
+ * Set the properties used for schema.
+ *
+ * These properties will be used to configure the schema registry client.
+ *
+ * @param properties schema registry properties
+ */
+ ClientBuilder schemaProperties(Map properties);
}
```
-The `ClientConfigurationData` supports transfer `SchemaInfoProviderFactory`.
+The `ClientConfigurationData` supports transfer `schemaProperties`.
```java
public class ClientConfigurationData implements Serializable, Cloneable {
- @JsonIgnore
- private transient SchemaInfoProviderFactory schemaInfoProviderFactory;
+ private Map schemaProperties;
}
```
-The customized schema can get the `SchemaInfoProvider` and retrieve the configs from it, extends the interface `AutoCloseable` to support close some resources.
+The customized external schemas can get the `SchemaInfoProvider` and retrieve the configs from it, extends the interface `AutoCloseable` to support close external schema resources.
```java
public interface Schema extends Cloneable, AutoCloseable {
@@ -217,34 +227,23 @@ public interface Schema extends Cloneable, AutoCloseable {
}
/**
- * When setting schema info provider for schema, the schema can retrieve the configs.
+ * When setting schema info provider for schema, the schema can retrieve the configs and initialize itself.
*/
default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
}
- /**
- * Returns the schema info provider.
- *
- * @return a {@code SchemaInfoProvider} representing the schema info provider
- */
- default SchemaInfoProvider getSchemaInfoProvider() {
- return null;
- }
-
}
```
# Pulsar Function
For support using third-party schema registry service in Pulsar Function,
-- Support setting the `SchemaInfoProviderFactory` while initializing the Pulsar client
-- Support the schema registry auth configurations while initializing the Pulsar client
+- Support setting the `schemaProperties` while initializing the Pulsar client
- Support the `SchemaType.EXTERNAL` schema type in Pulsar Function
# Security Considerations
-Users can provide security configuration while generating the `SchemaInfoProviderFactory` instance,
-the factory can transfer the security configuration to the `SchemaInfoProvider` instance.
+Users can provide schema registry security configuration in the `schemaProperties`.
# Pulsar-GEO replication impact
@@ -261,17 +260,11 @@ By using an external schema registry:
# Backward & Forward Compatibility
-## Revert
-
-No changes are needed to revert to the previous version.
-
-## Upgrade
-
-No other changes are needed to upgrade to the new version.
+The new schema type `SchemaType.EXTERNAL` doesn't break any existing Pulsar topics, it's not compatible with other Pulsar schema types.
# Alternatives
-Use `bytes` schema for "external" schemas, it does not provide any compatibility checks to protect topic data that use Pulsar's native schema system.
+Use `bytes` schema for "external" schemas, it can't provide any compatibility checks to protect topic data that use Pulsar's native schema system.
# General Notes
From 4452d56b7949dc58b4fd72fcb315d67380c76b53 Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Tue, 8 Jul 2025 22:45:58 +0800
Subject: [PATCH 10/13] fix
---
pip/pip-420.md | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/pip/pip-420.md b/pip/pip-420.md
index 01e48f06debb7..83df54ebcb3ab 100644
--- a/pip/pip-420.md
+++ b/pip/pip-420.md
@@ -92,14 +92,13 @@ To integrate with external schema registries, users can:
Unlike Pulsar, which uses **schema version** to identify schemas, many external schema registry systems use **schema ID** as the primary schema identifier.
When integrating with external schema registries:
-- The `schemaVersion` filed in Pulsar message metadata is used in some places, **set to `-1` to flag the message is using external schema systems**.
-- The Pulsar client **needs to set the schema version to -1 and store in the message metadata**.
+- The `schemaVersion` will not indicate the read schema version, it points an external schema.
- Instead, the external schema implementation can manage schema ID handling internally.
- The schema ID can be embedded directly into the message payload by the external schema’s `encode` and `decode` methods.
This approach allows external schema systems to fully control schema evolution and versioning without being constrained by Pulsar’s native schema versioning mechanism.
-This may impact some components that rely on schema version to deserialize messages, such as Pulsar Functions and Pulsar SQL,
-they will need to be updated to support the new schema type and handle external schemas appropriately.
+This may impact some components that rely on schema version to deserialize messages, such as Pulsar Functions and Pulsar SQL;
+they need to be updated to support setting schema properties, using the new schema type and handle external schemas appropriately.
#### Example usage
From 7148af2239e5e4ac61cd67494dce1c74d2ab341a Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Tue, 12 Aug 2025 14:06:13 +0800
Subject: [PATCH 11/13] update
---
pip/pip-420.md | 188 ++++++++++++++++++++++++++++---------------------
1 file changed, 108 insertions(+), 80 deletions(-)
diff --git a/pip/pip-420.md b/pip/pip-420.md
index 83df54ebcb3ab..0e7b8e72f42c5 100644
--- a/pip/pip-420.md
+++ b/pip/pip-420.md
@@ -53,7 +53,7 @@ Pulsar will introduce a new schema type: **SchemaType.EXTERNAL**.
- Introduce a new compatibility check in broker side.
- The schema type `SchemaType.EXTERNAL` can't be compatible with other Pulsar schemas
- This prevents accidental data corruption or schema conflicts between internal and external schema management systems.
-- Pulsar Geo replicator needs to transfer the schema type `SchemaType.EXTERNAL` to the remote cluster.
+- Pulsar Geo replicator needs to transfer the external schema info to the remote clusters.
This design isolates external schema management and protects existing topics using Pulsar’s native schema system.
@@ -63,21 +63,26 @@ To integrate with external schema registries, users can:
- Implement the `Schema` interface to define custom schema encoding and decoding logic.
#### Key `Schema` Interface Methods:
-- byte[] encode(T message)
- - Serializes the message using the external schema.
+
+Schema interface introduces methods for encoding and decoding messages,
+allowing external schema implementations to handle serialization and deserialization.
+
+- EncodeData encode(String topic, T message) **(New addition)**
+ - Serializes the message using the external schema.
+ - The encode method will be responsible for managing schema evolution and versioning.
+ - The method returns an `EncodeData` object that contains:
+ - The encoded byte array.
+ - The schema ID associated with the serialized data.
- Implementations should throw `SchemaSerializationException` if the serialization fails.
-- T decode(byte[] bytes)
+- T decode(String topic, byte[] data, byte[] schemaId) **(New addition)**
- Deserialize the message using the external schema.
+ - The external schema can retrieve the schema by the schema ID.
- Users should handle exceptions when get value by themselves.
-- void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider)
- - Call this method when creating schema
- - External schema can be initialized when calling this method
-
-- close() **(New addition)**
+- closeAsync() **(New addition)**
- Called when the producer or consumer is closed.
- - Allows external schema implementations to release resources, such as schema registry connections or caches.
+ - Allows external schema implementations to release resources.
#### Example Workflow:
@@ -92,9 +97,14 @@ To integrate with external schema registries, users can:
Unlike Pulsar, which uses **schema version** to identify schemas, many external schema registry systems use **schema ID** as the primary schema identifier.
When integrating with external schema registries:
-- The `schemaVersion` will not indicate the read schema version, it points an external schema.
-- Instead, the external schema implementation can manage schema ID handling internally.
-- The schema ID can be embedded directly into the message payload by the external schema’s `encode` and `decode` methods.
+- The `schemaVersion` will point to external schema info.
+- The external schema will be responsible for managing schema evolution and versioning, this is different from Pulsar's native schema versioning.
+- The schema encode method will return an `EncodeData` object that contains the encoded data and the schema ID.
+- For store the external schema ID, this PIP introduces a new optional field `schema_id` in the `MessageMetadata`.
+- The KeyValueSchema doesn't support using Pulsar's native schema and external schema at the same time.
+
+The KeyValueSchemaID is also a byte array, the format is: keySchemaIdLength(4) + keySchemaId + valueSchemaIdLength(4) + valueSchemaId,
+external schemas need to decode the key and value schema IDs from the KeyValueSchemaID.
This approach allows external schema systems to fully control schema evolution and versioning without being constrained by Pulsar’s native schema versioning mechanism.
This may impact some components that rely on schema version to deserialize messages, such as Pulsar Functions and Pulsar SQL;
@@ -104,61 +114,43 @@ they need to be updated to support setting schema properties, using the new sche
```java
public void workWithExternalSchemaRegistry() throws Exception {
- Map srConfig = new HashedMap<>();
- srConfig.put("schema.registry.url", "http://localhost:8001");
-
String topic = "testExternalJsonSchema";
-
- Schema schema = KafkaSchemas.JSON(User.class);
-
+
+ Map configs = new HashedMap<>();
+ configs.put("schema.registry.url", getSchemaRegistryUrl());
+ configs.put("json.fail.unknown.properties", "false");
+ KafkaSchemaFactory kafkaSchemaFactory = new KafkaSchemaFactory(configs);
+ Schema schema = kafkaSchemaFactory.json(User.class);
+
@Cleanup
- PulsarClient pulsarClient = PulsarClient.builder()
- .serviceUrl("pulsar://localhost:" + getBrokerServicePort())
- .schemaProperties(srConfig)
- .build();
-
+ PulsarClient pulsarClient = getPulsarClient();
+
@Cleanup
Producer producer = pulsarClient.newProducer(schema)
.topic(topic)
.create();
-
+
@Cleanup
Consumer consumer = pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
-
+
for (int i = 0; i < 10; i++) {
producer.send(new User("name-" + i, 10 + i));
}
-
+
for (int i = 0; i < 10; i++) {
Message message = consumer.receive();
consumer.acknowledge(message);
- assertEquals(message.getValue().getName(), "name-" + i);
+ System.out.println("receive msg " + message.getValue().getClass().getName() + " " + message.getValue());
}
}
```
## Public-facing Changes
-Introduce a new SchemaType `EXTERNAL` to represent the schema types that work with external schema registry.
-```java
-public enum SchemaType {
-
- /**
- * External Schema Type.
- *
- * This is used to indicate that the schema is managed externally, such as in a schema registry.
- * External schema type is not compatible with any other schema type.
- *
- */
- EXTERNAL(21)
-
-}
-```
-
```protobuf
// File `SchemaRegistryFormat.proto`
message SchemaInfo {
@@ -177,59 +169,100 @@ message Schema {
}
```
-Add a new method `getConfigs` for `SchemaInfoProvider` interface to provide necessary params for external schemas connect to the schema registry service.
+Add a new field `schema_id` to the `MessageMetadata` to store the schema ID for messages that use external schemas.
+```protobuf
+// File `PulsarApi.proto`
+message MessageMetadata {
+ optional bytes schema_id = 31;
+}
+```
+
+Introduce a new SchemaType `EXTERNAL` to represent the schema types that work with external schema registry.
```java
-public interface SchemaInfoProvider {
+public enum SchemaType {
/**
- * Returns the configs of the schema registry service, such as URL, authentication params.
- */
- default Map getConfigs() {
- return Collections.emptyMap();
- }
+ * External Schema Type.
+ *
+ * This is used to indicate that the schema is managed externally, such as in a schema registry.
+ * External schema type is not compatible with any other schema type.
+ *
+ */
+ EXTERNAL(21)
}
```
-The client build supports setting the `schemaProperties`.
+Add a new class `EncodeData` to encapsulate the encoded data and schema ID.
```java
-public interface ClientBuilder extends Serializable, Cloneable {
+public class EncodeData {
- /**
- * Set the properties used for schema.
- *
- * These properties will be used to configure the schema registry client.
- *
- * @param properties schema registry properties
- */
- ClientBuilder schemaProperties(Map properties);
+ private byte[] data;
+
+ private byte[] schemaId;
}
```
-The `ClientConfigurationData` supports transfer `schemaProperties`.
+Add a new method `getSchemaId()` to the `Message` interface to retrieve the schema ID of the message.
+This method will return the schema ID if the message is produced with an external schema, otherwise it will return null.
```java
-public class ClientConfigurationData implements Serializable, Cloneable {
+public interface Message {
- private Map schemaProperties;
+ /**
+ * Get schema ID of the message.
+ * PIP-420 provides a way to produce messages with external schema,
+ * and the schema ID will be set to the message metadata.
+ *
+ * @return schema ID of the message if the message is produced with external schema.
+ */
+ byte[] getSchemaId();
}
```
-The customized external schemas can get the `SchemaInfoProvider` and retrieve the configs from it, extends the interface `AutoCloseable` to support close external schema resources.
+Add two methods to encode and decode messages, which can be used by external schemas to serialize and deserialize messages.
+The encode method returns an `EncodeData` object that contains the encoded byte array and schema ID.
+The customized external schemas can set the `SchemaInfoProvider` and retrieve the configs from it,
+extends the interface `AutoCloseable` to support close external schema resources.
+
```java
-public interface Schema extends Cloneable, AutoCloseable {
+import java.util.concurrent.CompletableFuture;
- @Override
- default void close() {
- // no-op
- }
+public interface Schema extends Cloneable {
- /**
- * When setting schema info provider for schema, the schema can retrieve the configs and initialize itself.
- */
- default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
- }
+ /**
+ * Encodes the message into a byte array using the schema.
+ *
+ * @param topic the topic for which the message is being encoded
+ * @param message the message to encode
+ * @return the encoded byte array and schema ID
+ * @throws SchemaSerializationException if the encoding fails
+ */
+ default EncodeData encode(String topic, T message) {
+ return new EncodeData(encode(message), null);
+ }
+
+ default T decode(String topic, ByteBuffer data, byte[] schemaId) {
+ return decode(topic, getBytes(data), schemaId);
+ }
+
+ /**
+ * Decodes a byte array into a message using the schema.
+ *
+ * @param topic the topic for which the message is being decoded
+ * @param data the byte array to decode
+ * @param schemaId the schema ID associated with the data
+ * @return the decoded message
+ * @throws SchemaSerializationException if the decoding fails
+ */
+ default T decode(String topic, byte[] data, byte[] schemaId) {
+ return decode(data, schemaId);
+ }
+
+ default void closeAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
}
```
@@ -237,13 +270,8 @@ public interface Schema extends Cloneable, AutoCloseable {
# Pulsar Function
For support using third-party schema registry service in Pulsar Function,
-- Support setting the `schemaProperties` while initializing the Pulsar client
- Support the `SchemaType.EXTERNAL` schema type in Pulsar Function
-# Security Considerations
-
-Users can provide schema registry security configuration in the `schemaProperties`.
-
# Pulsar-GEO replication impact
Integrating third-party schema registry services introduces a new approach to managing schemas for geo-replicated topics.
From eed3dc896cd468fe1571643c029c1e8eee29cd71 Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Tue, 12 Aug 2025 19:00:18 +0800
Subject: [PATCH 12/13] update
---
pip/pip-420.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/pip/pip-420.md b/pip/pip-420.md
index 0e7b8e72f42c5..c335a95494078 100644
--- a/pip/pip-420.md
+++ b/pip/pip-420.md
@@ -73,7 +73,7 @@ allowing external schema implementations to handle serialization and deserializa
- The method returns an `EncodeData` object that contains:
- The encoded byte array.
- The schema ID associated with the serialized data.
- - Implementations should throw `SchemaSerializationException` if the serialization fails.
+ - Implementations should throw `SchemaSerializationException` if the serialization or deserialization fails.
- T decode(String topic, byte[] data, byte[] schemaId) **(New addition)**
- Deserialize the message using the external schema.
@@ -89,7 +89,7 @@ allowing external schema implementations to handle serialization and deserializa
- During producer or consumer initialization:
The external schema info will be registered to Pulsar schema storage.
-- During message send or receive:
+- During producing or receiving messages:
The `encode` and `decode` methods handle the schema-aware serialization and deserialization using the external schema registry.
#### Schema ID & Schema Version
From 8a7e643a924812cadb4d148d8625fe9c8fda170e Mon Sep 17 00:00:00 2001
From: "gaoran_10@126.com"
Date: Fri, 15 Aug 2025 14:19:55 +0800
Subject: [PATCH 13/13] update vote mailing url
---
pip/pip-420.md | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/pip/pip-420.md b/pip/pip-420.md
index c335a95494078..b1adec06dd4fa 100644
--- a/pip/pip-420.md
+++ b/pip/pip-420.md
@@ -300,5 +300,4 @@ Use `bytes` schema for "external" schemas, it can't provide any compatibility ch
-* Mailing List discussion thread: https://lists.apache.org/thread/olx4xm8cdy43omp5c0jm44sj1gp0grcr
-* Mailing List voting thread: https://lists.apache.org/thread/vhq6ox4nh2rx59yoxowftqzv8f9lnm4q
+* Mailing List voting thread: https://lists.apache.org/thread/g7hypmql3gk2zog6cmmhg4h93hfw1o15