Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-8783 Add rulesToMerge/rulesToRemove to tags API #2789

Merged
merged 3 commits into from Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Expand Up @@ -22,7 +22,7 @@
files="(Errors|AvroMessageReader).java"/>

<suppress checks="CyclomaticComplexity"
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|AvroSchema|AvroSchemaUtils|CompatibilityResource|Config|ConfigResource|ConfigUpdateRequest|ConfigValue|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaJsonSchemaSerializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject|Schema|AvroTypeDescription|CelExecutor|DataEncryptionKeyId|EncryptionKeyId|EncryptionUpdateRequestHandler|FieldEncryptionExecutor|FieldRuleExecutor|Rule|WildcardMatcher|JsonSchemaComparator|DlqAction|LocalSchemaRegistryClient|RetryExecutor).java"/>
files="(AbstractKafkaAvroDeserializer|AbstractKafkaAvroSerializer|AbstractKafkaSchemaSerDe|AvroSchema|AvroSchemaUtils|CompatibilityResource|Config|ConfigResource|ConfigUpdateRequest|ConfigValue|Context|ContextKey|KafkaSchemaRegistry|KafkaStore|KafkaStoreMessageHandler|KafkaStoreReaderThread|AvroData|DownloadSchemaRegistryMojo|MockSchemaRegistryClient|SchemaRegistrySerializer|SchemaValue|SubjectVersionsResource|ProtobufSchema|SchemaDiff|FieldSchemaDiff|MessageSchemaDiff|DynamicSchema|SchemaMessageFormatter|ProtobufData|JsonSchema|JSON.*|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaJsonSchemaSerializer|JsonSchemaData|JsonSchemaUtils|MessageDefinition|ProtobufSchemaUtils|SchemaMessageReader|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|SubjectKeyComparator|ContextFilter|QualifiedSubject|Schema|AvroTypeDescription|CelExecutor|DataEncryptionKeyId|EncryptionKeyId|EncryptionUpdateRequestHandler|FieldEncryptionExecutor|FieldRuleExecutor|Rule|WildcardMatcher|JsonSchemaComparator|DlqAction|LocalSchemaRegistryClient|RetryExecutor|TagSchemaRequest).java"/>

<suppress checks="NPathComplexity"
files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor).java"/>
Expand Down
Expand Up @@ -121,8 +121,8 @@ public void updateHash(MessageDigest md) {
}

public void validate() throws RuleException {
Set<String> names = new HashSet<>();
if (migrationRules != null) {
Set<String> names = new HashSet<>();
for (Rule rule : migrationRules) {
String name = rule.getName();
if (names.contains(name)) {
Expand All @@ -136,7 +136,6 @@ public void validate() throws RuleException {
}
}
if (domainRules != null) {
Set<String> names = new HashSet<>();
for (Rule rule : domainRules) {
String name = rule.getName();
if (names.contains(name)) {
Expand Down
Expand Up @@ -49,6 +49,8 @@ public class TagSchemaRequest {
private List<SchemaTags> tagsToRemove;
private Metadata metadata;
private RuleSet ruleSet;
private RuleSet rulesToMerge;
private List<String> rulesToRemove;

public TagSchemaRequest() {
}
Expand Down Expand Up @@ -106,6 +108,26 @@ public void setRuleSet(RuleSet ruleSet) {
this.ruleSet = ruleSet;
}

@JsonProperty("rulesToMerge")
public RuleSet getRulesToMerge() {
return rulesToMerge;
}

@JsonProperty("rulesToMerge")
public void setRulesToMerge(RuleSet rulesToMerge) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not take List as a parameter? Or should we call it setRuleSetToMerge?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

@rayokota rayokota Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rulesToMerge takes a RuleSet, and rulesToRemove takes a list of rule names. By "rules" I'm referring to the object semantically, not to the data structure that is passed in. I'd rather not call it ruleSetToMerge and ruleNamesToRemove

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO I think rulesToRemove is better than ruleNamesToRemove. But I think ruleSetToMerge might better than rulesToMerge. However, it seems like the field that the customer sets is rulesToMerge. What do you think about changing that json field to ruleSetToMerge since it has the same attributes as the existing ruleSet field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather keep it rulesToMerge. In any case, this API may be temporary.

this.rulesToMerge = rulesToMerge;
}

@JsonProperty("rulesToRemove")
public List<String> getRulesToRemove() {
return rulesToRemove;
}

@JsonProperty("rulesToRemove")
public void setRulesToRemove(List<String> rulesToRemove) {
this.rulesToRemove = rulesToRemove;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -122,12 +144,15 @@ public boolean equals(Object o) {
&& Objects.equals(tagsToAdd, tagSchemaRequest.tagsToAdd)
&& Objects.equals(tagsToRemove, tagSchemaRequest.tagsToRemove)
&& Objects.equals(metadata, tagSchemaRequest.metadata)
&& Objects.equals(ruleSet, tagSchemaRequest.ruleSet);
&& Objects.equals(ruleSet, tagSchemaRequest.ruleSet)
&& Objects.equals(rulesToMerge, tagSchemaRequest.rulesToMerge)
&& Objects.equals(rulesToRemove, tagSchemaRequest.rulesToRemove);
}

@Override
public int hashCode() {
return Objects.hash(newVersion, tagsToAdd, tagsToRemove, metadata, ruleSet);
return Objects.hash(newVersion, tagsToAdd, tagsToRemove, metadata,
ruleSet, rulesToMerge, rulesToRemove);
}

public String toJson() throws IOException {
Expand Down
Expand Up @@ -15,8 +15,12 @@

package io.confluent.kafka.schemaregistry.rest.resources;

import static io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet.mergeRuleSets;

import io.confluent.kafka.schemaregistry.client.rest.Versions;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
Expand Down Expand Up @@ -50,6 +54,7 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -624,18 +629,26 @@ public void modifyTags(
}
schemaRegistry.getCompositeUpdateRequestHandler().handle(schema, request);

if (request.getRuleSet() != null) {
try {
request.getRuleSet().validate();
} catch (RuleException e) {
throw new RestInvalidRuleSetException(e.getMessage());
}
}

Map<String, String> headerProperties = requestHeaderBuilder.buildRequestHeaders(
headers, schemaRegistry.config().whitelistHeaders());
RegisterSchemaResponse registerSchemaResponse;
try {
if (request.getRulesToMerge() != null || request.getRulesToRemove() != null) {
if (request.getRuleSet() != null) {
throw new RestInvalidRuleSetException(
"ruleSet should be omitted if specifying rulesToMerge or rulesToRemove");
}
modifyPreviousRuleSet(subject, request);
}

if (request.getRuleSet() != null) {
try {
request.getRuleSet().validate();
} catch (RuleException e) {
throw new RestInvalidRuleSetException(e.getMessage());
}
}

Schema result =
schemaRegistry.modifySchemaTagsOrForward(subject, schema, request, headerProperties);
registerSchemaResponse = new RegisterSchemaResponse(result);
Expand Down Expand Up @@ -664,4 +677,32 @@ public void modifyTags(
}
asyncResponse.resume(registerSchemaResponse);
}

private void modifyPreviousRuleSet(String subject, TagSchemaRequest request)
throws SchemaRegistryException {
int oldVersion = request.getNewVersion() != null ? request.getNewVersion() - 1 : -1;
Schema oldSchema = schemaRegistry.get(subject, oldVersion, false);
// Use the previous ruleSet instead of the passed in one
RuleSet ruleSet = oldSchema != null ? oldSchema.getRuleSet() : null;
if (request.getRulesToMerge() != null) {
ruleSet = mergeRuleSets(ruleSet, request.getRulesToMerge());
}
if (ruleSet != null && request.getRulesToRemove() != null) {
List<String> rulesToRemove = request.getRulesToRemove();
List<Rule> migrationRules = ruleSet.getMigrationRules();
if (migrationRules != null) {
migrationRules = migrationRules.stream()
.filter(r -> !rulesToRemove.contains(r.getName()))
.collect(Collectors.toList());
}
List<Rule> domainRules = ruleSet.getDomainRules();
if (domainRules != null) {
domainRules = domainRules.stream()
.filter(r -> !rulesToRemove.contains(r.getName()))
.collect(Collectors.toList());
}
ruleSet = new RuleSet(migrationRules, domainRules);
}
request.setRuleSet(ruleSet);
rayokota marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Up @@ -14,23 +14,32 @@
*/
package io.confluent.kafka.schemaregistry.rest;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedSet;
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaEntity;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaTags;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.TagSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.RuleSetHandler;
import io.confluent.kafka.schemaregistry.utils.TestUtils;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public class RestApiRegisterSchemaTagsTest extends ClusterTestHarness {

Expand All @@ -44,6 +53,27 @@ public RestApiRegisterSchemaTagsTest() {
super(1, true);
}

@Before
public void setUp() throws Exception {
super.setUp();
((KafkaSchemaRegistry) restApp.schemaRegistry()).setRuleSetHandler(new RuleSetHandler() {
public void handle(String subject, ConfigUpdateRequest request) {
}

public void handle(String subject, boolean normalize, RegisterSchemaRequest request) {
}

public void handle(String subject, TagSchemaRequest request) {
}

public io.confluent.kafka.schemaregistry.storage.RuleSet transform(RuleSet ruleSet) {
return ruleSet != null
? new io.confluent.kafka.schemaregistry.storage.RuleSet(ruleSet)
: null;
}
});
}

@Test
public void testRegisterSchemaTagsBasic() throws Exception {
String subject = "test";
Expand Down Expand Up @@ -222,4 +252,63 @@ public void testRegisterSchemaTagsWithInvalidTags() throws Exception {
assertEquals(Errors.INVALID_SCHEMA_ERROR_CODE, e.getErrorCode());
}
}

@Test
public void testRegisterSchemaTagsIncrementalRuleSet() throws Exception {
String subject = "test";
TestUtils.registerAndVerifySchema(restApp.restClient, schemaString, 1, subject);

TagSchemaRequest tagSchemaRequest = new TagSchemaRequest();
tagSchemaRequest.setNewVersion(2);
Rule migrationRule = new Rule("myMigrationRule", null, null, RuleMode.UPGRADE,
"fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
Rule migrationRule2 = new Rule("myMigrationRule2", null, null, RuleMode.UPGRADE,
"fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
Rule domainRule = new Rule("myRule", null, null, null,
"fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
Rule domainRule2 = new Rule("myRule2", null, null, null,
"fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
Rule domainRule3 = new Rule("myRule3", null, null, null,
"fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
RuleSet ruleSet = new RuleSet(ImmutableList.of(migrationRule, migrationRule2),
ImmutableList.of(domainRule, domainRule2, domainRule3));
tagSchemaRequest.setRulesToMerge(ruleSet);
tagSchemaRequest.setRulesToRemove(ImmutableList.of("myRule4"));

RegisterSchemaResponse responses = restApp.restClient
.modifySchemaTags(RestService.DEFAULT_REQUEST_PROPERTIES, tagSchemaRequest, subject, "latest");
assertEquals(2, responses.getId());
assertEquals(ruleSet, responses.getRuleSet());

Schema result = restApp.restClient.getLatestVersion(subject);
assertEquals("2", result.getMetadata().getProperties().get("confluent:version"));
assertEquals(ruleSet, responses.getRuleSet());

tagSchemaRequest = new TagSchemaRequest();
tagSchemaRequest.setNewVersion(3);
Rule migrationRule3 = new Rule("myMigrationRule3", null, null, RuleMode.UPGRADE,
"fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
Rule domainRule5 = new Rule("myRule5", null, null, null,
"fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
Rule domainRule4 = new Rule("myRule4", null, null, null,
"fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
domainRule2 = new Rule("myRule2", null, null, null,
"fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
ruleSet = new RuleSet(ImmutableList.of(migrationRule3),
ImmutableList.of(domainRule5, domainRule4, domainRule2));
tagSchemaRequest.setRulesToMerge(ruleSet);
tagSchemaRequest.setRulesToRemove(ImmutableList.of("myRule", "myMigrationRule2"));

RuleSet expectedRuleSet = new RuleSet(ImmutableList.of(migrationRule, migrationRule3),
ImmutableList.of(domainRule3, domainRule5, domainRule4, domainRule2));
responses = restApp.restClient
.modifySchemaTags(RestService.DEFAULT_REQUEST_PROPERTIES, tagSchemaRequest, subject, "latest");
assertEquals(3, responses.getId());
assertEquals(expectedRuleSet, responses.getRuleSet());

result = restApp.restClient.getLatestVersion(subject);
assertEquals((Integer) 3, result.getVersion());
assertEquals("3", result.getMetadata().getProperties().get("confluent:version"));
assertEquals(expectedRuleSet, responses.getRuleSet());
}
}