diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 32ce64c59ef..5c4006f6785 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -22,7 +22,7 @@
files="(Errors|AvroMessageReader).java"/>
+ files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|AvroSchema|AvroSchemaUtils|CompatibilityResource|Config|ConfigResource|ConfigUpdateRequest|ConfigValue|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaJsonSchemaSerializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject|Schema|AvroTypeDescription|CelExecutor|DataEncryptionKeyId|EncryptionKeyId|EncryptionUpdateRequestHandler|FieldEncryptionExecutor|FieldRuleExecutor|Rule|WildcardMatcher|JsonSchemaComparator|DlqAction|LocalSchemaRegistryClient|RetryExecutor|TagSchemaRequest).java"/>
diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/RuleSet.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/RuleSet.java
index 7155090fff5..dd41f1a3bbd 100644
--- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/RuleSet.java
+++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/RuleSet.java
@@ -121,8 +121,8 @@ public void updateHash(MessageDigest md) {
}
public void validate() throws RuleException {
+ Set names = new HashSet<>();
if (migrationRules != null) {
- Set names = new HashSet<>();
for (Rule rule : migrationRules) {
String name = rule.getName();
if (names.contains(name)) {
@@ -136,7 +136,6 @@ public void validate() throws RuleException {
}
}
if (domainRules != null) {
- Set names = new HashSet<>();
for (Rule rule : domainRules) {
String name = rule.getName();
if (names.contains(name)) {
diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/TagSchemaRequest.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/TagSchemaRequest.java
index c2a6dbb650b..ac71db91521 100644
--- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/TagSchemaRequest.java
+++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/TagSchemaRequest.java
@@ -49,6 +49,8 @@ public class TagSchemaRequest {
private List tagsToRemove;
private Metadata metadata;
private RuleSet ruleSet;
+ private RuleSet rulesToMerge;
+ private List rulesToRemove;
public TagSchemaRequest() {
}
@@ -106,6 +108,26 @@ public void setRuleSet(RuleSet ruleSet) {
this.ruleSet = ruleSet;
}
+ @JsonProperty("rulesToMerge")
+ public RuleSet getRulesToMerge() {
+ return rulesToMerge;
+ }
+
+ @JsonProperty("rulesToMerge")
+ public void setRulesToMerge(RuleSet rulesToMerge) {
+ this.rulesToMerge = rulesToMerge;
+ }
+
+ @JsonProperty("rulesToRemove")
+ public List getRulesToRemove() {
+ return rulesToRemove;
+ }
+
+ @JsonProperty("rulesToRemove")
+ public void setRulesToRemove(List rulesToRemove) {
+ this.rulesToRemove = rulesToRemove;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -122,12 +144,15 @@ public boolean equals(Object o) {
&& Objects.equals(tagsToAdd, tagSchemaRequest.tagsToAdd)
&& Objects.equals(tagsToRemove, tagSchemaRequest.tagsToRemove)
&& Objects.equals(metadata, tagSchemaRequest.metadata)
- && Objects.equals(ruleSet, tagSchemaRequest.ruleSet);
+ && Objects.equals(ruleSet, tagSchemaRequest.ruleSet)
+ && Objects.equals(rulesToMerge, tagSchemaRequest.rulesToMerge)
+ && Objects.equals(rulesToRemove, tagSchemaRequest.rulesToRemove);
}
@Override
public int hashCode() {
- return Objects.hash(newVersion, tagsToAdd, tagsToRemove, metadata, ruleSet);
+ return Objects.hash(newVersion, tagsToAdd, tagsToRemove, metadata,
+ ruleSet, rulesToMerge, rulesToRemove);
}
public String toJson() throws IOException {
diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java
index e5518c582e7..fa815196112 100644
--- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java
+++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/resources/SubjectVersionsResource.java
@@ -15,8 +15,12 @@
package io.confluent.kafka.schemaregistry.rest.resources;
+import static io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet.mergeRuleSets;
+
import io.confluent.kafka.schemaregistry.client.rest.Versions;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
+import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
+import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
@@ -50,6 +54,7 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -624,18 +629,26 @@ public void modifyTags(
}
schemaRegistry.getCompositeUpdateRequestHandler().handle(schema, request);
- if (request.getRuleSet() != null) {
- try {
- request.getRuleSet().validate();
- } catch (RuleException e) {
- throw new RestInvalidRuleSetException(e.getMessage());
- }
- }
-
Map headerProperties = requestHeaderBuilder.buildRequestHeaders(
headers, schemaRegistry.config().whitelistHeaders());
RegisterSchemaResponse registerSchemaResponse;
try {
+ if (request.getRulesToMerge() != null || request.getRulesToRemove() != null) {
+ if (request.getRuleSet() != null) {
+ throw new RestInvalidRuleSetException(
+ "ruleSet should be omitted if specifying rulesToMerge or rulesToRemove");
+ }
+ modifyPreviousRuleSet(subject, request);
+ }
+
+ if (request.getRuleSet() != null) {
+ try {
+ request.getRuleSet().validate();
+ } catch (RuleException e) {
+ throw new RestInvalidRuleSetException(e.getMessage());
+ }
+ }
+
Schema result =
schemaRegistry.modifySchemaTagsOrForward(subject, schema, request, headerProperties);
registerSchemaResponse = new RegisterSchemaResponse(result);
@@ -664,4 +677,32 @@ public void modifyTags(
}
asyncResponse.resume(registerSchemaResponse);
}
+
+ private void modifyPreviousRuleSet(String subject, TagSchemaRequest request)
+ throws SchemaRegistryException {
+ int oldVersion = request.getNewVersion() != null ? request.getNewVersion() - 1 : -1;
+ Schema oldSchema = schemaRegistry.get(subject, oldVersion, false);
+ // Use the previous ruleSet instead of the passed in one
+ RuleSet ruleSet = oldSchema != null ? oldSchema.getRuleSet() : null;
+ if (request.getRulesToMerge() != null) {
+ ruleSet = mergeRuleSets(ruleSet, request.getRulesToMerge());
+ }
+ if (ruleSet != null && request.getRulesToRemove() != null) {
+ List rulesToRemove = request.getRulesToRemove();
+ List migrationRules = ruleSet.getMigrationRules();
+ if (migrationRules != null) {
+ migrationRules = migrationRules.stream()
+ .filter(r -> !rulesToRemove.contains(r.getName()))
+ .collect(Collectors.toList());
+ }
+ List domainRules = ruleSet.getDomainRules();
+ if (domainRules != null) {
+ domainRules = domainRules.stream()
+ .filter(r -> !rulesToRemove.contains(r.getName()))
+ .collect(Collectors.toList());
+ }
+ ruleSet = new RuleSet(migrationRules, domainRules);
+ }
+ request.setRuleSet(ruleSet);
+ }
}
diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiRegisterSchemaTagsTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiRegisterSchemaTagsTest.java
index 545bfe84bcd..0de01d14e3c 100644
--- a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiRegisterSchemaTagsTest.java
+++ b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiRegisterSchemaTagsTest.java
@@ -14,23 +14,32 @@
*/
package io.confluent.kafka.schemaregistry.rest;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSortedSet;
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
+import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
+import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
+import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaEntity;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaTags;
+import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
+import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.TagSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
+import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
+import io.confluent.kafka.schemaregistry.storage.RuleSetHandler;
import io.confluent.kafka.schemaregistry.utils.TestUtils;
+import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
public class RestApiRegisterSchemaTagsTest extends ClusterTestHarness {
@@ -44,6 +53,27 @@ public RestApiRegisterSchemaTagsTest() {
super(1, true);
}
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ ((KafkaSchemaRegistry) restApp.schemaRegistry()).setRuleSetHandler(new RuleSetHandler() {
+ public void handle(String subject, ConfigUpdateRequest request) {
+ }
+
+ public void handle(String subject, boolean normalize, RegisterSchemaRequest request) {
+ }
+
+ public void handle(String subject, TagSchemaRequest request) {
+ }
+
+ public io.confluent.kafka.schemaregistry.storage.RuleSet transform(RuleSet ruleSet) {
+ return ruleSet != null
+ ? new io.confluent.kafka.schemaregistry.storage.RuleSet(ruleSet)
+ : null;
+ }
+ });
+ }
+
@Test
public void testRegisterSchemaTagsBasic() throws Exception {
String subject = "test";
@@ -222,4 +252,63 @@ public void testRegisterSchemaTagsWithInvalidTags() throws Exception {
assertEquals(Errors.INVALID_SCHEMA_ERROR_CODE, e.getErrorCode());
}
}
+
+ @Test
+ public void testRegisterSchemaTagsIncrementalRuleSet() throws Exception {
+ String subject = "test";
+ TestUtils.registerAndVerifySchema(restApp.restClient, schemaString, 1, subject);
+
+ TagSchemaRequest tagSchemaRequest = new TagSchemaRequest();
+ tagSchemaRequest.setNewVersion(2);
+ Rule migrationRule = new Rule("myMigrationRule", null, null, RuleMode.UPGRADE,
+ "fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
+ Rule migrationRule2 = new Rule("myMigrationRule2", null, null, RuleMode.UPGRADE,
+ "fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
+ Rule domainRule = new Rule("myRule", null, null, null,
+ "fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
+ Rule domainRule2 = new Rule("myRule2", null, null, null,
+ "fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
+ Rule domainRule3 = new Rule("myRule3", null, null, null,
+ "fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
+ RuleSet ruleSet = new RuleSet(ImmutableList.of(migrationRule, migrationRule2),
+ ImmutableList.of(domainRule, domainRule2, domainRule3));
+ tagSchemaRequest.setRulesToMerge(ruleSet);
+ tagSchemaRequest.setRulesToRemove(ImmutableList.of("myRule4"));
+
+ RegisterSchemaResponse responses = restApp.restClient
+ .modifySchemaTags(RestService.DEFAULT_REQUEST_PROPERTIES, tagSchemaRequest, subject, "latest");
+ assertEquals(2, responses.getId());
+ assertEquals(ruleSet, responses.getRuleSet());
+
+ Schema result = restApp.restClient.getLatestVersion(subject);
+ assertEquals("2", result.getMetadata().getProperties().get("confluent:version"));
+ assertEquals(ruleSet, responses.getRuleSet());
+
+ tagSchemaRequest = new TagSchemaRequest();
+ tagSchemaRequest.setNewVersion(3);
+ Rule migrationRule3 = new Rule("myMigrationRule3", null, null, RuleMode.UPGRADE,
+ "fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
+ Rule domainRule5 = new Rule("myRule5", null, null, null,
+ "fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
+ Rule domainRule4 = new Rule("myRule4", null, null, null,
+ "fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
+ domainRule2 = new Rule("myRule2", null, null, null,
+ "fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
+ ruleSet = new RuleSet(ImmutableList.of(migrationRule3),
+ ImmutableList.of(domainRule5, domainRule4, domainRule2));
+ tagSchemaRequest.setRulesToMerge(ruleSet);
+ tagSchemaRequest.setRulesToRemove(ImmutableList.of("myRule", "myMigrationRule2"));
+
+ RuleSet expectedRuleSet = new RuleSet(ImmutableList.of(migrationRule, migrationRule3),
+ ImmutableList.of(domainRule3, domainRule5, domainRule4, domainRule2));
+ responses = restApp.restClient
+ .modifySchemaTags(RestService.DEFAULT_REQUEST_PROPERTIES, tagSchemaRequest, subject, "latest");
+ assertEquals(3, responses.getId());
+ assertEquals(expectedRuleSet, responses.getRuleSet());
+
+ result = restApp.restClient.getLatestVersion(subject);
+ assertEquals((Integer) 3, result.getVersion());
+ assertEquals("3", result.getMetadata().getProperties().get("confluent:version"));
+ assertEquals(expectedRuleSet, responses.getRuleSet());
+ }
}