Skip to content

Commit

Permalink
Merge branch '7.4.x' into 7.5.x
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Sep 13, 2023
2 parents 3ebda26 + 17462b6 commit a5cdae6
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg
*/
public static final int MIN_VERSION = 1;
public static final int MAX_VERSION = Integer.MAX_VALUE;
private static final String CONFLUENT_VERSION = "confluent:version";
public static final String CONFLUENT_VERSION = "confluent:version";
private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);

private final SchemaRegistryConfig config;
Expand Down Expand Up @@ -625,7 +625,7 @@ public Schema register(String subject,

boolean modifiedSchema = false;
if (mode != Mode.IMPORT) {
modifiedSchema = maybePopulateFromPrevious(config, schema, undeletedVersions);
modifiedSchema = maybePopulateFromPrevious(config, schema, undeletedVersions, newVersion);
}

int schemaId = schema.getId();
Expand Down Expand Up @@ -773,7 +773,7 @@ private boolean isReadOnlyMode(String subject) throws SchemaRegistryStoreExcepti
}

private boolean maybePopulateFromPrevious(
Config config, Schema schema, List<ParsedSchemaHolder> undeletedVersions)
Config config, Schema schema, List<ParsedSchemaHolder> undeletedVersions, int newVersion)
throws SchemaRegistryException {
boolean populatedSchema = false;
SchemaValue previousSchemaValue = undeletedVersions.size() > 0
Expand All @@ -794,11 +794,13 @@ private boolean maybePopulateFromPrevious(
throw new InvalidSchemaException("Empty schema");
}
}
boolean populatedMetadataRuleSet = maybeSetMetadataRuleSet(config, schema, previousSchema);
boolean populatedMetadataRuleSet = maybeSetMetadataRuleSet(
config, schema, previousSchema, newVersion);
return populatedSchema || populatedMetadataRuleSet;
}

private boolean maybeSetMetadataRuleSet(Config config, Schema schema, Schema previousSchema) {
private boolean maybeSetMetadataRuleSet(
Config config, Schema schema, Schema previousSchema, int newVersion) {
io.confluent.kafka.schemaregistry.client.rest.entities.Metadata specificMetadata = null;
if (schema.getMetadata() != null) {
specificMetadata = schema.getMetadata();
Expand All @@ -825,6 +827,17 @@ private boolean maybeSetMetadataRuleSet(Config config, Schema schema, Schema pre
overrideRuleSet = config.getOverrideRuleSet();
mergedRuleSet = mergeRuleSets(mergeRuleSets(defaultRuleSet, specificRuleSet), overrideRuleSet);
if (mergedMetadata != null || mergedRuleSet != null) {
if (mergedMetadata != null && mergedMetadata.getProperties() != null) {
Map<String, String> props = mergedMetadata.getProperties();
String versionStr = props.get(CONFLUENT_VERSION);
if ("0".equals(versionStr)) {
Map<String, String> newProps =
Collections.singletonMap(CONFLUENT_VERSION, String.valueOf(newVersion));
mergedMetadata = mergeMetadata(mergedMetadata,
new io.confluent.kafka.schemaregistry.client.rest.entities.Metadata(
null, newProps, null));
}
}
schema.setMetadata(mergedMetadata);
schema.setRuleSet(mergedRuleSet);
return true;
Expand Down Expand Up @@ -876,13 +889,11 @@ public Schema modifySchemaTags(String subject, Schema schema, TagSchemaRequest r
Metadata mergedMetadata = request.getMetadata() != null
? request.getMetadata()
: parsedSchema.metadata();
if (request.getNewVersion() != null) {
Metadata newMetadata = new Metadata(
Collections.emptyMap(),
Collections.singletonMap(CONFLUENT_VERSION, String.valueOf(newVersion)),
Collections.emptySet());
mergedMetadata = Metadata.mergeMetadata(mergedMetadata, newMetadata);
}
Metadata newMetadata = new Metadata(
Collections.emptyMap(),
Collections.singletonMap(CONFLUENT_VERSION, String.valueOf(newVersion)),
Collections.emptySet());
mergedMetadata = Metadata.mergeMetadata(mergedMetadata, newMetadata);

try {
ParsedSchema newSchema = parsedSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,50 @@ public void testRegisterSchemaTagsBasic() throws Exception {
assertEquals("3", result.getMetadata().getProperties().get("confluent:version"));
}

@Test
public void testRegisterSchemaWithoutNewVersionInput() throws Exception {
String subject = "test";
TestUtils.registerAndVerifySchema(restApp.restClient, schemaString, 1, subject);

TagSchemaRequest tagSchemaRequest = new TagSchemaRequest();
tagSchemaRequest.setTagsToAdd(Collections.singletonList(
new SchemaTags(new SchemaEntity("myrecord", SchemaEntity.EntityType.SR_RECORD),
Arrays.asList("TAG1", "TAG2"))));

String expectedSchema = "{" +
"\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]," +
"\"confluent:tags\":[\"TAG1\",\"TAG2\"]}";
RegisterSchemaResponse responses = restApp.restClient
.modifySchemaTags(RestService.DEFAULT_REQUEST_PROPERTIES, tagSchemaRequest, subject, "latest");
assertEquals(2, responses.getId());

Schema result = restApp.restClient.getLatestVersion(subject);
assertEquals(expectedSchema, result.getSchema());
assertEquals((Integer) 2, result.getVersion());
assertEquals("2", result.getMetadata().getProperties().get("confluent:version"));

tagSchemaRequest = new TagSchemaRequest();
tagSchemaRequest.setTagsToRemove(Collections.singletonList(
new SchemaTags(new SchemaEntity("myrecord", SchemaEntity.EntityType.SR_RECORD),
Arrays.asList("TAG2"))));

expectedSchema = "{" +
"\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]," +
"\"confluent:tags\":[\"TAG1\"]}";
responses = restApp.restClient
.modifySchemaTags(RestService.DEFAULT_REQUEST_PROPERTIES, tagSchemaRequest, subject, "latest");
assertEquals(3, responses.getId());

result = restApp.restClient.getLatestVersion(subject);
assertEquals(expectedSchema, result.getSchema());
assertEquals((Integer) 3, result.getVersion());
assertEquals("3", result.getMetadata().getProperties().get("confluent:version"));
}

@Test
public void testRegisterSchemaTagsInDiffContext() throws Exception {
String subject = ":.ctx:testSubject";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ public void testRegisterSchemaTagsBasic() throws Exception {
Schema result = restApp.restClient.getLatestVersion(subject);
assertEquals(expectedSchema, result.getSchema());
assertEquals((Integer) 2, result.getVersion());
assertNull(result.getMetadata());
assertEquals("2", result.getMetadata().getProperties().get("confluent:version"));

tagSchemaRequest = new TagSchemaRequest();
tagSchemaRequest.setNewVersion(3);
Expand Down

0 comments on commit a5cdae6

Please sign in to comment.