Skip to content

Commit

Permalink
For tags API, only modify ruleset on leader node (#2808)
Browse files Browse the repository at this point in the history
* For tags API, only modify ruleset on leader node

* Fix checkstyle

* Fix checkstyle
  • Loading branch information
rayokota committed Oct 26, 2023
1 parent 5d57f8d commit 92be717
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 36 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor).java"/>

<suppress checks="JavaNCSS"
files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader).java"/>
files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader|SubjectVersionsResource).java"/>

<suppress checks="MethodLength"
files="(AvroData|ProtobufSchema|ProtobufSchemaUtils).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,8 @@

package io.confluent.kafka.schemaregistry.rest.resources;

import static io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet.mergeRuleSets;

import io.confluent.kafka.schemaregistry.client.rest.Versions;
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
Expand Down Expand Up @@ -54,7 +50,6 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.tags.Tags;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -638,7 +633,14 @@ public void modifyTags(
throw new RestInvalidRuleSetException(
"ruleSet should be omitted if specifying rulesToMerge or rulesToRemove");
}
modifyPreviousRuleSet(subject, request);
}

if (request.getRulesToMerge() != null) {
try {
request.getRulesToMerge().validate();
} catch (RuleException e) {
throw new RestInvalidRuleSetException(e.getMessage());
}
}

if (request.getRuleSet() != null) {
Expand Down Expand Up @@ -677,32 +679,4 @@ public void modifyTags(
}
asyncResponse.resume(registerSchemaResponse);
}

private void modifyPreviousRuleSet(String subject, TagSchemaRequest request)
throws SchemaRegistryException {
int oldVersion = request.getNewVersion() != null ? request.getNewVersion() - 1 : -1;
Schema oldSchema = schemaRegistry.get(subject, oldVersion, false);
// Use the previous ruleSet instead of the passed in one
RuleSet ruleSet = oldSchema != null ? oldSchema.getRuleSet() : null;
if (request.getRulesToMerge() != null) {
ruleSet = mergeRuleSets(ruleSet, request.getRulesToMerge());
}
if (ruleSet != null && request.getRulesToRemove() != null) {
List<String> rulesToRemove = request.getRulesToRemove();
List<Rule> migrationRules = ruleSet.getMigrationRules();
if (migrationRules != null) {
migrationRules = migrationRules.stream()
.filter(r -> !rulesToRemove.contains(r.getName()))
.collect(Collectors.toList());
}
List<Rule> domainRules = ruleSet.getDomainRules();
if (domainRules != null) {
domainRules = domainRules.stream()
.filter(r -> !rulesToRemove.contains(r.getName()))
.collect(Collectors.toList());
}
ruleSet = new RuleSet(migrationRules, domainRules);
}
request.setRuleSet(ruleSet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Config;
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion;
Expand Down Expand Up @@ -896,18 +898,51 @@ public Schema modifySchemaTags(String subject, Schema schema, TagSchemaRequest r
Collections.emptySet());
mergedMetadata = Metadata.mergeMetadata(mergedMetadata, newMetadata);

RuleSet ruleSet = maybeModifyPreviousRuleSet(subject, request);

try {
ParsedSchema newSchema = parsedSchema
.copy(TagSchemaRequest.schemaTagsListToMap(request.getTagsToAdd()),
TagSchemaRequest.schemaTagsListToMap(request.getTagsToRemove()))
.copy(mergedMetadata, request.getRuleSet())
.copy(mergedMetadata, ruleSet)
.copy(newVersion);
return register(subject, new Schema(subject, newVersion, -1, newSchema), false);
} catch (IllegalArgumentException e) {
throw new InvalidSchemaException(e);
}
}

private RuleSet maybeModifyPreviousRuleSet(String subject, TagSchemaRequest request)
throws SchemaRegistryException {
if (request.getRulesToMerge() == null && request.getRulesToRemove() == null) {
return request.getRuleSet();
}
int oldVersion = request.getNewVersion() != null ? request.getNewVersion() - 1 : -1;
Schema oldSchema = get(subject, oldVersion, false);
// Use the previous ruleSet instead of the passed in one
RuleSet ruleSet = oldSchema != null ? oldSchema.getRuleSet() : null;
if (request.getRulesToMerge() != null) {
ruleSet = mergeRuleSets(ruleSet, request.getRulesToMerge());
}
if (ruleSet != null && request.getRulesToRemove() != null) {
List<String> rulesToRemove = request.getRulesToRemove();
List<Rule> migrationRules = ruleSet.getMigrationRules();
if (migrationRules != null) {
migrationRules = migrationRules.stream()
.filter(r -> !rulesToRemove.contains(r.getName()))
.collect(Collectors.toList());
}
List<Rule> domainRules = ruleSet.getDomainRules();
if (domainRules != null) {
domainRules = domainRules.stream()
.filter(r -> !rulesToRemove.contains(r.getName()))
.collect(Collectors.toList());
}
ruleSet = new RuleSet(migrationRules, domainRules);
}
return ruleSet;
}

public Schema modifySchemaTagsOrForward(String subject,
Schema schema,
TagSchemaRequest request,
Expand Down

0 comments on commit 92be717

Please sign in to comment.