From 17462b69c3dcd86914106b18d033adc8d795b6ae Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 13 Sep 2023 15:38:54 -0700 Subject: [PATCH] DGS-8323 Always populate version metadata prop for tags API (#2758) * Always populate newVersion for tags API * Fix test --- .../storage/KafkaSchemaRegistry.java | 35 ++++++++++----- .../rest/RestApiRegisterSchemaTagsTest.java | 44 +++++++++++++++++++ .../rest/protobuf/RestApiTest.java | 2 +- 3 files changed, 68 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index c27188b9723..a06e18d36f4 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -119,7 +119,7 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg */ public static final int MIN_VERSION = 1; public static final int MAX_VERSION = Integer.MAX_VALUE; - private static final String CONFLUENT_VERSION = "confluent:version"; + public static final String CONFLUENT_VERSION = "confluent:version"; private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class); private final SchemaRegistryConfig config; @@ -625,7 +625,7 @@ public Schema register(String subject, boolean modifiedSchema = false; if (mode != Mode.IMPORT) { - modifiedSchema = maybePopulateFromPrevious(config, schema, undeletedVersions); + modifiedSchema = maybePopulateFromPrevious(config, schema, undeletedVersions, newVersion); } int schemaId = schema.getId(); @@ -773,7 +773,7 @@ private boolean isReadOnlyMode(String subject) throws SchemaRegistryStoreExcepti } private boolean maybePopulateFromPrevious( - Config config, Schema schema, List undeletedVersions) + Config config, Schema schema, List undeletedVersions, int newVersion) throws SchemaRegistryException { boolean populatedSchema = false; SchemaValue previousSchemaValue = undeletedVersions.size() > 0 @@ -794,11 +794,13 @@ private boolean maybePopulateFromPrevious( throw new InvalidSchemaException("Empty schema"); } } - boolean populatedMetadataRuleSet = maybeSetMetadataRuleSet(config, schema, previousSchema); + boolean populatedMetadataRuleSet = maybeSetMetadataRuleSet( + config, schema, previousSchema, newVersion); return populatedSchema || populatedMetadataRuleSet; } - private boolean maybeSetMetadataRuleSet(Config config, Schema schema, Schema previousSchema) { + private boolean maybeSetMetadataRuleSet( + Config config, Schema schema, Schema previousSchema, int newVersion) { io.confluent.kafka.schemaregistry.client.rest.entities.Metadata specificMetadata = null; if (schema.getMetadata() != null) { specificMetadata = schema.getMetadata(); @@ -825,6 +827,17 @@ private boolean maybeSetMetadataRuleSet(Config config, Schema schema, Schema pre overrideRuleSet = config.getOverrideRuleSet(); mergedRuleSet = mergeRuleSets(mergeRuleSets(defaultRuleSet, specificRuleSet), overrideRuleSet); if (mergedMetadata != null || mergedRuleSet != null) { + if (mergedMetadata != null && mergedMetadata.getProperties() != null) { + Map props = mergedMetadata.getProperties(); + String versionStr = props.get(CONFLUENT_VERSION); + if ("0".equals(versionStr)) { + Map newProps = + Collections.singletonMap(CONFLUENT_VERSION, String.valueOf(newVersion)); + mergedMetadata = mergeMetadata(mergedMetadata, + new io.confluent.kafka.schemaregistry.client.rest.entities.Metadata( + null, newProps, null)); + } + } schema.setMetadata(mergedMetadata); schema.setRuleSet(mergedRuleSet); return true; @@ -876,13 +889,11 @@ public Schema modifySchemaTags(String subject, Schema schema, TagSchemaRequest r Metadata mergedMetadata = request.getMetadata() != null ? request.getMetadata() : parsedSchema.metadata(); - if (request.getNewVersion() != null) { - Metadata newMetadata = new Metadata( - Collections.emptyMap(), - Collections.singletonMap(CONFLUENT_VERSION, String.valueOf(newVersion)), - Collections.emptySet()); - mergedMetadata = Metadata.mergeMetadata(mergedMetadata, newMetadata); - } + Metadata newMetadata = new Metadata( + Collections.emptyMap(), + Collections.singletonMap(CONFLUENT_VERSION, String.valueOf(newVersion)), + Collections.emptySet()); + mergedMetadata = Metadata.mergeMetadata(mergedMetadata, newMetadata); try { ParsedSchema newSchema = parsedSchema diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiRegisterSchemaTagsTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiRegisterSchemaTagsTest.java index 363b0e73c14..545bfe84bcd 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiRegisterSchemaTagsTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiRegisterSchemaTagsTest.java @@ -90,6 +90,50 @@ public void testRegisterSchemaTagsBasic() throws Exception { assertEquals("3", result.getMetadata().getProperties().get("confluent:version")); } + @Test + public void testRegisterSchemaWithoutNewVersionInput() throws Exception { + String subject = "test"; + TestUtils.registerAndVerifySchema(restApp.restClient, schemaString, 1, subject); + + TagSchemaRequest tagSchemaRequest = new TagSchemaRequest(); + tagSchemaRequest.setTagsToAdd(Collections.singletonList( + new SchemaTags(new SchemaEntity("myrecord", SchemaEntity.EntityType.SR_RECORD), + Arrays.asList("TAG1", "TAG2")))); + + String expectedSchema = "{" + + "\"type\":\"record\"," + + "\"name\":\"myrecord\"," + + "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]," + + "\"confluent:tags\":[\"TAG1\",\"TAG2\"]}"; + RegisterSchemaResponse responses = restApp.restClient + .modifySchemaTags(RestService.DEFAULT_REQUEST_PROPERTIES, tagSchemaRequest, subject, "latest"); + assertEquals(2, responses.getId()); + + Schema result = restApp.restClient.getLatestVersion(subject); + assertEquals(expectedSchema, result.getSchema()); + assertEquals((Integer) 2, result.getVersion()); + assertEquals("2", result.getMetadata().getProperties().get("confluent:version")); + + tagSchemaRequest = new TagSchemaRequest(); + tagSchemaRequest.setTagsToRemove(Collections.singletonList( + new SchemaTags(new SchemaEntity("myrecord", SchemaEntity.EntityType.SR_RECORD), + Arrays.asList("TAG2")))); + + expectedSchema = "{" + + "\"type\":\"record\"," + + "\"name\":\"myrecord\"," + + "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]," + + "\"confluent:tags\":[\"TAG1\"]}"; + responses = restApp.restClient + .modifySchemaTags(RestService.DEFAULT_REQUEST_PROPERTIES, tagSchemaRequest, subject, "latest"); + assertEquals(3, responses.getId()); + + result = restApp.restClient.getLatestVersion(subject); + assertEquals(expectedSchema, result.getSchema()); + assertEquals((Integer) 3, result.getVersion()); + assertEquals("3", result.getMetadata().getProperties().get("confluent:version")); + } + @Test public void testRegisterSchemaTagsInDiffContext() throws Exception { String subject = ":.ctx:testSubject"; diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/protobuf/RestApiTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/protobuf/RestApiTest.java index b9fb321bd80..06f19e51543 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/protobuf/RestApiTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/protobuf/RestApiTest.java @@ -486,7 +486,7 @@ public void testRegisterSchemaTagsBasic() throws Exception { Schema result = restApp.restClient.getLatestVersion(subject); assertEquals(expectedSchema, result.getSchema()); assertEquals((Integer) 2, result.getVersion()); - assertNull(result.getMetadata()); + assertEquals("2", result.getMetadata().getProperties().get("confluent:version")); tagSchemaRequest = new TagSchemaRequest(); tagSchemaRequest.setNewVersion(3);