Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-2958 Search contexts when retrieving schema #2168

Merged
merged 4 commits into from Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -118,7 +118,15 @@ public Schema getSchemaByVersion(
+ version
+ " from the schema registry";
try {
schema = schemaRegistry.validateAndGetSchema(subject, versionId, lookupDeletedSchema);
schema = schemaRegistry.getUsingContexts(
subject, versionId.getVersionId(), lookupDeletedSchema);
if (schema == null) {
if (!schemaRegistry.hasSubjects(subject, lookupDeletedSchema)) {
throw Errors.subjectNotFoundException(subject);
} else {
throw Errors.versionNotFoundException(versionId.getVersionId());
}
}
} catch (SchemaRegistryStoreException e) {
log.debug(errorMessage, e);
throw Errors.storeException(errorMessage, e);
Expand Down Expand Up @@ -357,7 +365,14 @@ public void deleteSchemaVersion(
throw Errors.schemaVersionSoftDeletedException(subject, version);
}
}
schema = schemaRegistry.validateAndGetSchema(subject, versionId, true);
schema = schemaRegistry.get(subject, versionId.getVersionId(), true);
if (schema == null) {
if (!schemaRegistry.hasSubjects(subject, true)) {
throw Errors.subjectNotFoundException(subject);
} else {
throw Errors.versionNotFoundException(versionId.getVersionId());
}
}
} catch (SchemaRegistryStoreException e) {
log.debug(errorMessage, e);
throw Errors.storeException(errorMessage, e);
Expand Down
Expand Up @@ -107,18 +107,19 @@ public void lookUpSchemaUnderSubject(
);
io.confluent.kafka.schemaregistry.client.rest.entities.Schema matchingSchema = null;
try {
if (!schemaRegistry.hasSubjects(subject, lookupDeletedSchema)) {
throw Errors.subjectNotFoundException(subject);
matchingSchema = schemaRegistry.lookUpSchemaUnderSubjectUsingContexts(
subject, schema, normalize, lookupDeletedSchema);
if (matchingSchema == null) {
if (!schemaRegistry.hasSubjects(subject, lookupDeletedSchema)) {
throw Errors.subjectNotFoundException(subject);
} else {
throw Errors.schemaNotFoundException();
}
}
matchingSchema =
schemaRegistry.lookUpSchemaUnderSubject(subject, schema, normalize, lookupDeletedSchema);
} catch (SchemaRegistryException e) {
throw Errors.schemaRegistryException("Error while looking up schema under subject " + subject,
e);
}
if (matchingSchema == null) {
throw Errors.schemaNotFoundException();
}
asyncResponse.resume(matchingSchema);
}

Expand Down
Expand Up @@ -55,7 +55,6 @@
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.VersionId;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
Expand Down Expand Up @@ -758,6 +757,41 @@ public List<Integer> deleteSubjectOrForward(
}
}

public Schema lookUpSchemaUnderSubjectUsingContexts(
String subject, Schema schema, boolean normalize, boolean lookupDeletedSchema)
throws SchemaRegistryException {
Schema matchingSchema =
lookUpSchemaUnderSubject(subject, schema, normalize, lookupDeletedSchema);
if (matchingSchema != null) {
return matchingSchema;
}
QualifiedSubject qs = QualifiedSubject.create(tenant(), subject);
boolean isQualifiedSubject = qs != null && !DEFAULT_CONTEXT.equals(qs.getContext());
if (isQualifiedSubject) {
return null;
}
// Try qualifying the subject with each known context
try (CloseableIterator<SchemaRegistryValue> iter = allContexts()) {
while (iter.hasNext()) {
ContextValue v = (ContextValue) iter.next();
QualifiedSubject qualSub = new QualifiedSubject(v.getTenant(), v.getContext(), subject);
Schema qualSchema = new Schema(
qualSub.toQualifiedSubject(),
schema.getVersion(),
schema.getId(),
schema.getSchemaType(),
schema.getReferences(),
schema.getSchema()
);
matchingSchema = lookUpSchemaUnderSubject(
qualSub.toQualifiedSubject(), qualSchema, normalize, lookupDeletedSchema);
if (matchingSchema != null) {
return matchingSchema;
}
}
}
return null;
}

/**
* Checks if given schema was ever registered under a subject. If found, it returns the version of
Expand Down Expand Up @@ -787,14 +821,14 @@ public Schema lookUpSchemaUnderSubject(
Collections.reverse(allVersions);

for (SchemaValue schemaValue : allVersions) {
if (!schemaValue.isDeleted()
if ((lookupDeletedSchema || !schemaValue.isDeleted())
&& parsedSchema.references().isEmpty()
&& !schemaValue.getReferences().isEmpty()) {
Schema undeleted = getSchemaEntityFromSchemaValue(schemaValue);
ParsedSchema undeletedSchema = parseSchema(undeleted);
if (parsedSchema.deepEquals(undeletedSchema)) {
Schema prev = getSchemaEntityFromSchemaValue(schemaValue);
ParsedSchema prevSchema = parseSchema(prev);
if (parsedSchema.deepEquals(prevSchema)) {
// This handles the case where a schema is sent with all references resolved
return undeleted;
return prev;
}
}
}
Expand Down Expand Up @@ -1056,18 +1090,29 @@ private ParsedSchema loadSchema(
return parsedSchema;
}

public Schema validateAndGetSchema(String subject, VersionId versionId, boolean
public Schema getUsingContexts(String subject, int version, boolean
returnDeletedSchema) throws SchemaRegistryException {
final int version = versionId.getVersionId();
Schema schema = this.get(subject, version, returnDeletedSchema);
if (schema == null) {
if (!this.hasSubjects(subject, returnDeletedSchema)) {
throw Errors.subjectNotFoundException(subject);
} else {
throw Errors.versionNotFoundException(version);
Schema schema = get(subject, version, returnDeletedSchema);
if (schema != null) {
return schema;
}
QualifiedSubject qs = QualifiedSubject.create(tenant(), subject);
boolean isQualifiedSubject = qs != null && !DEFAULT_CONTEXT.equals(qs.getContext());
if (isQualifiedSubject) {
return null;
}
// Try qualifying the subject with each known context
try (CloseableIterator<SchemaRegistryValue> iter = allContexts()) {
while (iter.hasNext()) {
ContextValue v = (ContextValue) iter.next();
QualifiedSubject qualSub = new QualifiedSubject(v.getTenant(), v.getContext(), subject);
schema = get(qualSub.toQualifiedSubject(), version, returnDeletedSchema);
if (schema != null) {
return schema;
}
}
}
return schema;
return null;
}

public boolean schemaVersionExists(String subject, VersionId versionId, boolean
Expand Down Expand Up @@ -1374,9 +1419,27 @@ private List<SchemaValue> getAllSchemaValues(String subject)
@Override
public Schema getLatestVersion(String subject) throws SchemaRegistryException {
try (CloseableIterator<SchemaRegistryValue> allVersions = allVersions(subject, false)) {
List<Schema> sortedVersions = sortSchemasByVersion(allVersions, false);
return sortedVersions.size() > 0 ? sortedVersions.get(sortedVersions.size() - 1) : null;
return getLatestVersionFromSubjectSchemas(allVersions);
}
}

private Schema getLatestVersionFromSubjectSchemas(
CloseableIterator<SchemaRegistryValue> schemas) {
int latestVersionId = -1;
SchemaValue latestSchemaValue = null;

while (schemas.hasNext()) {
SchemaValue schemaValue = (SchemaValue) schemas.next();
if (schemaValue.isDeleted()) {
continue;
}
if (schemaValue.getVersion() > latestVersionId) {
latestVersionId = schemaValue.getVersion();
latestSchemaValue = schemaValue;
}
}

return latestSchemaValue != null ? getSchemaEntityFromSchemaValue(latestSchemaValue) : null;
}

private CloseableIterator<SchemaRegistryValue> allVersions(
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.utils.TestUtils;
Expand Down Expand Up @@ -139,6 +140,16 @@ public void testQualifiedSubjects() throws Exception {
assertEquals("Getting all schemas should match all registered subjects",
schemasInSubject1 + schemasInSubject2,
restApp.restClient.getSchemas(":*:", false, false).size());

Schema schema = restApp.restClient.getVersion("testTopic1", 1);
assertEquals("Getting schema by version w/o context should succeed",
1,
schema.getVersion().intValue());

schema = restApp.restClient.lookUpSubjectVersion(schema.getSchema(), "testTopic1");
assertEquals("Getting schema by schema w/o context should succeed",
1,
schema.getVersion().intValue());
}

@Test
Expand Down
Expand Up @@ -1280,7 +1280,7 @@ public void testDeleteSchemaVersionInvalidSubject() throws Exception {

@Test
public void testDeleteLatestVersion() throws Exception {
List<String> schemas = TestUtils.getRandomCanonicalAvroString(2);
List<String> schemas = TestUtils.getRandomCanonicalAvroString(3);
String subject = "test";

TestUtils.registerAndVerifySchema(restApp.restClient, schemas.get(0), 1, subject);
Expand All @@ -1306,6 +1306,45 @@ public void testDeleteLatestVersion() throws Exception {
Errors.SUBJECT_NOT_FOUND_ERROR_CODE,
rce.getErrorCode());
}

TestUtils.registerAndVerifySchema(restApp.restClient, schemas.get(2), 3, subject);
assertEquals("Latest version available after subject re-registration",
schemas.get(2),
restApp.restClient.getLatestVersion(subject).getSchema());
}

@Test
public void testGetLatestVersionNonExistentSubject() throws Exception {
String subject = "non_existent_subject";

try {
restApp.restClient.getLatestVersion(subject);
fail("Getting latest versions from non-existing subject should fail with "
+ Errors.SUBJECT_NOT_FOUND_ERROR_CODE
+ " (subject not found).");
} catch (RestClientException rce) {
assertEquals("Should get a 404 status for non-existing subject",
Errors.SUBJECT_NOT_FOUND_ERROR_CODE,
rce.getErrorCode());
}
}

@Test
public void testGetLatestVersionDeleteOlder() throws Exception {
List<String> schemas = TestUtils.getRandomCanonicalAvroString(2);
String subject = "test";

TestUtils.registerAndVerifySchema(restApp.restClient, schemas.get(0), 1, subject);
TestUtils.registerAndVerifySchema(restApp.restClient, schemas.get(1), 2, subject);

assertEquals("Latest Version Schema", schemas.get(1), restApp.restClient.getLatestVersion(subject).getSchema());

assertEquals("Deleting Schema Older Version Success", (Integer) 1, restApp.restClient
.deleteSchemaVersion
(RestService.DEFAULT_REQUEST_PROPERTIES, subject, "1"));
assertEquals("Latest Version Schema Still Same",
schemas.get(1),
restApp.restClient.getLatestVersion(subject).getSchema());
}

@Test
Expand Down