Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-8783 Add rulesToMerge/rulesToRemove to tags API #2789

Merged
merged 3 commits into from Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -121,8 +121,8 @@ public void updateHash(MessageDigest md) {
}

public void validate() throws RuleException {
Set<String> names = new HashSet<>();
if (migrationRules != null) {
Set<String> names = new HashSet<>();
for (Rule rule : migrationRules) {
String name = rule.getName();
if (names.contains(name)) {
Expand All @@ -136,7 +136,6 @@ public void validate() throws RuleException {
}
}
if (domainRules != null) {
Set<String> names = new HashSet<>();
for (Rule rule : domainRules) {
String name = rule.getName();
if (names.contains(name)) {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaEntity;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class TagSchemaRequest {
private List<SchemaTags> tagsToRemove;
private Metadata metadata;
private RuleSet ruleSet;
private RuleSet rulesToMerge;
private List<String> rulesToRemove;

public TagSchemaRequest() {
}
Expand Down Expand Up @@ -106,6 +109,26 @@ public void setRuleSet(RuleSet ruleSet) {
this.ruleSet = ruleSet;
}

@JsonProperty("rulesToMerge")
public RuleSet getRulesToMerge() {
return rulesToMerge;
}

@JsonProperty("rulesToMerge")
public void setRulesToMerge(RuleSet rulesToMerge) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not take List as a parameter? Or should we call it setRuleSetToMerge?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

@rayokota rayokota Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rulesToMerge takes a RuleSet, and rulesToRemove takes a list of rule names. By "rules" I'm referring to the object semantically, not to the data structure that is passed in. I'd rather not call it ruleSetToMerge and ruleNamesToRemove

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO I think rulesToRemove is better than ruleNamesToRemove. But I think ruleSetToMerge might better than rulesToMerge. However, it seems like the field that the customer sets is rulesToMerge. What do you think about changing that json field to ruleSetToMerge since it has the same attributes as the existing ruleSet field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather keep it rulesToMerge. In any case, this API may be temporary.

this.rulesToMerge = rulesToMerge;
}

@JsonProperty("rulesToRemove")
public List<String> getRulesToRemove() {
return rulesToRemove;
}

@JsonProperty("rulesToRemove")
public void setRulesToRemove(List<String> rulesToRemove) {
this.rulesToRemove = rulesToRemove;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -122,12 +145,15 @@ public boolean equals(Object o) {
&& Objects.equals(tagsToAdd, tagSchemaRequest.tagsToAdd)
&& Objects.equals(tagsToRemove, tagSchemaRequest.tagsToRemove)
&& Objects.equals(metadata, tagSchemaRequest.metadata)
&& Objects.equals(ruleSet, tagSchemaRequest.ruleSet);
&& Objects.equals(ruleSet, tagSchemaRequest.ruleSet)
&& Objects.equals(rulesToMerge, tagSchemaRequest.rulesToMerge)
&& Objects.equals(rulesToRemove, tagSchemaRequest.rulesToRemove);
}

@Override
public int hashCode() {
return Objects.hash(newVersion, tagsToAdd, tagsToRemove, metadata, ruleSet);
return Objects.hash(newVersion, tagsToAdd, tagsToRemove, metadata,
ruleSet, rulesToMerge, rulesToRemove);
}

public String toJson() throws IOException {
Expand Down
Expand Up @@ -15,8 +15,12 @@

package io.confluent.kafka.schemaregistry.rest.resources;

import static io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet.mergeRuleSets;

import io.confluent.kafka.schemaregistry.client.rest.Versions;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
Expand Down Expand Up @@ -50,6 +54,7 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -624,18 +629,46 @@ public void modifyTags(
}
schemaRegistry.getCompositeUpdateRequestHandler().handle(schema, request);

if (request.getRuleSet() != null) {
try {
request.getRuleSet().validate();
} catch (RuleException e) {
throw new RestInvalidRuleSetException(e.getMessage());
}
}

Map<String, String> headerProperties = requestHeaderBuilder.buildRequestHeaders(
headers, schemaRegistry.config().whitelistHeaders());
RegisterSchemaResponse registerSchemaResponse;
try {
RuleSet ruleSet = request.getRuleSet();
if (request.getRulesToMerge() != null || request.getRulesToRemove() != null) {
int oldVersion = request.getNewVersion() != null ? request.getNewVersion() - 1 : -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does -1 mean latest version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Schema oldSchema = schemaRegistry.get(subject, oldVersion, false);
// Use the previous ruleSet instead of the passed in one
ruleSet = oldSchema != null ? oldSchema.getRuleSet() : null;
if (request.getRulesToMerge() != null) {
ruleSet = mergeRuleSets(ruleSet, request.getRulesToMerge());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change the ordering of the rules? If so does that matter? I remember you mentioning that the order of the ruse matter.

It seems like merging the rules would remove any existing role that has the same name as any rule in rulesToMerge, then add rulesToMerge at the end of the rules list.

Copy link
Member Author

@rayokota rayokota Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this code will prefer the order of the rulesToAdd ruleSet over the existing ruleSet. I think this is fine for encryption rules, which is what this API is meant to be used for currently.

}
if (ruleSet != null && request.getRulesToRemove() != null) {
List<String> rulesToRemove = request.getRulesToRemove();
List<Rule> migrationRules = ruleSet.getMigrationRules();
if (migrationRules != null) {
migrationRules = migrationRules.stream()
.filter(r -> !rulesToRemove.contains(r.getName()))
.collect(Collectors.toList());
}
List<Rule> domainRules = ruleSet.getDomainRules();
if (domainRules != null) {
domainRules = domainRules.stream()
.filter(r -> !rulesToRemove.contains(r.getName()))
.collect(Collectors.toList());
}
ruleSet = new RuleSet(migrationRules, domainRules);
}
request.setRuleSet(ruleSet);
}

if (request.getRuleSet() != null) {
try {
request.getRuleSet().validate();
} catch (RuleException e) {
throw new RestInvalidRuleSetException(e.getMessage());
}
}

rayokota marked this conversation as resolved.
Show resolved Hide resolved
Schema result =
schemaRegistry.modifySchemaTagsOrForward(subject, schema, request, headerProperties);
registerSchemaResponse = new RegisterSchemaResponse(result);
Expand Down
Expand Up @@ -14,23 +14,31 @@
*/
package io.confluent.kafka.schemaregistry.rest;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedSet;
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaEntity;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaTags;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.TagSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.RuleSetHandler;
import io.confluent.kafka.schemaregistry.utils.TestUtils;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

public class RestApiRegisterSchemaTagsTest extends ClusterTestHarness {

Expand All @@ -44,6 +52,27 @@ public RestApiRegisterSchemaTagsTest() {
super(1, true);
}

@Before
public void setUp() throws Exception {
super.setUp();
((KafkaSchemaRegistry) restApp.schemaRegistry()).setRuleSetHandler(new RuleSetHandler() {
public void handle(String subject, ConfigUpdateRequest request) {
}

public void handle(String subject, boolean normalize, RegisterSchemaRequest request) {
}

public void handle(String subject, TagSchemaRequest request) {
}

public io.confluent.kafka.schemaregistry.storage.RuleSet transform(RuleSet ruleSet) {
return ruleSet != null
? new io.confluent.kafka.schemaregistry.storage.RuleSet(ruleSet)
: null;
}
});
}

@Test
public void testRegisterSchemaTagsBasic() throws Exception {
String subject = "test";
Expand Down Expand Up @@ -222,4 +251,54 @@ public void testRegisterSchemaTagsWithInvalidTags() throws Exception {
assertEquals(Errors.INVALID_SCHEMA_ERROR_CODE, e.getErrorCode());
}
}

@Test
public void testRegisterSchemaTagsIncrementalRuleSet() throws Exception {
String subject = "test";
TestUtils.registerAndVerifySchema(restApp.restClient, schemaString, 1, subject);

TagSchemaRequest tagSchemaRequest = new TagSchemaRequest();
tagSchemaRequest.setNewVersion(2);
Rule rule = new Rule("myRule", null, null, null,
"fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
Rule rule2 = new Rule("myRule2", null, null, null,
"fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
Rule rule3 = new Rule("myRule3", null, null, null,
"fooType", ImmutableSortedSet.of("PII"), null, null, null, "NONE,NONE", false);
RuleSet ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule, rule2, rule3));
tagSchemaRequest.setRulesToMerge(ruleSet);
tagSchemaRequest.setRulesToRemove(ImmutableList.of("myRule4"));

RegisterSchemaResponse responses = restApp.restClient
.modifySchemaTags(RestService.DEFAULT_REQUEST_PROPERTIES, tagSchemaRequest, subject, "latest");
assertEquals(2, responses.getId());
assertEquals(ruleSet, responses.getRuleSet());

Schema result = restApp.restClient.getLatestVersion(subject);
assertEquals("2", result.getMetadata().getProperties().get("confluent:version"));
assertEquals(ruleSet, responses.getRuleSet());

tagSchemaRequest = new TagSchemaRequest();
tagSchemaRequest.setNewVersion(3);
Rule rule5 = new Rule("myRule5", null, null, null,
"fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
Rule rule4 = new Rule("myRule4", null, null, null,
"fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
rule2 = new Rule("myRule2", null, null, null,
"fooType", ImmutableSortedSet.of("PII2"), null, null, null, "NONE,NONE", false);
ruleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule5, rule4, rule2));
rayokota marked this conversation as resolved.
Show resolved Hide resolved
tagSchemaRequest.setRulesToMerge(ruleSet);
tagSchemaRequest.setRulesToRemove(ImmutableList.of("myRule"));

RuleSet expectedRuleSet = new RuleSet(Collections.emptyList(), ImmutableList.of(rule3, rule5, rule4, rule2));
responses = restApp.restClient
.modifySchemaTags(RestService.DEFAULT_REQUEST_PROPERTIES, tagSchemaRequest, subject, "latest");
assertEquals(3, responses.getId());
assertEquals(expectedRuleSet, responses.getRuleSet());

result = restApp.restClient.getLatestVersion(subject);
assertEquals((Integer) 3, result.getVersion());
assertEquals("3", result.getMetadata().getProperties().get("confluent:version"));
assertEquals(expectedRuleSet, responses.getRuleSet());
}
}