Skip to content

Commit

Permalink
Merge branch '7.2.x' into 7.3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Dec 6, 2022
2 parents 99a1df4 + f5606d3 commit be9630b
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.kafka.schemaregistry.storage;

import java.util.Iterator;

class DelegatingIterator<T> implements CloseableIterator<T> {

private final Iterator<T> iterator;

public DelegatingIterator(Iterator<T> iterator) {
this.iterator = iterator;
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public T next() {
return iterator.next();
}

@Override
public void remove() {
iterator.remove();
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,32 +388,4 @@ private BiPredicate<String, Integer> matchDeleted(Predicate<String> match) {
return false;
};
}

static class DelegatingIterator<T> implements CloseableIterator<T> {

private final Iterator<T> iterator;

public DelegatingIterator(Iterator<T> iterator) {
this.iterator = iterator;
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public T next() {
return iterator.next();
}

@Override
public void remove() {
iterator.remove();
}

@Override
public void close() {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1458,14 +1458,19 @@ private Schema getLatestVersionFromSubjectSchemas(
}

private CloseableIterator<SchemaRegistryValue> allVersions(
String subjectOrPrefix, boolean isPrefix) throws SchemaRegistryException {
String subjectOrPrefix, boolean isPrefix) throws SchemaRegistryException {
try {
String start;
String end;
int idx = subjectOrPrefix.indexOf(CONTEXT_WILDCARD);
if (idx >= 0) {
// Context wildcard match
// Context wildcard match (prefix may contain tenant)
String prefix = subjectOrPrefix.substring(0, idx);
String unqualifiedSubjectOrPrefix =
subjectOrPrefix.substring(idx + CONTEXT_WILDCARD.length());
if (!unqualifiedSubjectOrPrefix.isEmpty()) {
return allVersionsFromAllContexts(unqualifiedSubjectOrPrefix, isPrefix);
}
start = prefix + CONTEXT_PREFIX + CONTEXT_DELIMITER;
end = prefix + CONTEXT_PREFIX + Character.MAX_VALUE + CONTEXT_DELIMITER;
} else {
Expand All @@ -1481,6 +1486,28 @@ private CloseableIterator<SchemaRegistryValue> allVersions(
}
}

private CloseableIterator<SchemaRegistryValue> allVersionsFromAllContexts(
String unqualifiedSubjectOrPrefix, boolean isPrefix) throws SchemaRegistryException {
List<ContextValue> contexts = new ArrayList<>();
try (CloseableIterator<SchemaRegistryValue> iter = allContexts()) {
while (iter.hasNext()) {
contexts.add((ContextValue) iter.next());
}
}
List<SchemaRegistryValue> versions = new ArrayList<>();
for (ContextValue v : contexts) {
QualifiedSubject qualSub =
new QualifiedSubject(v.getTenant(), v.getContext(), unqualifiedSubjectOrPrefix);
try (CloseableIterator<SchemaRegistryValue> subiter =
allVersions(qualSub.toQualifiedSubject(), isPrefix)) {
while (subiter.hasNext()) {
versions.add(subiter.next());
}
}
}
return new DelegatingIterator<>(versions.iterator());
}

@Override
public void close() {
log.info("Shutting down schema registry");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ public RestApiContextTest() {
public void testQualifiedSubjects() throws Exception {
String subject1 = ":.ctx1:testTopic1";
String subject2 = ":.ctx2:testTopic2";
String subject3 = ":.ctx3:testTopic1";
int schemasInSubject1 = 10;
List<Integer> allVersionsInSubject1 = new ArrayList<Integer>();
List<String> allSchemasInSubject1 = TestUtils.getRandomCanonicalAvroString(schemasInSubject1);
int schemasInSubject2 = 5;
List<Integer> allVersionsInSubject2 = new ArrayList<Integer>();
List<String> allSchemasInSubject2 = TestUtils.getRandomCanonicalAvroString(schemasInSubject2);
int schemasInSubject3 = 2;
List<Integer> allVersionsInSubject3 = new ArrayList<Integer>();
List<String> allSchemasInSubject3 = TestUtils.getRandomCanonicalAvroString(schemasInSubject2);

// test getAllVersions with no existing data
try {
Expand Down Expand Up @@ -103,6 +107,19 @@ public void testQualifiedSubjects() throws Exception {
allVersionsInSubject2.add(expectedVersion);
}

// reset the schema id counter due to a different context
schemaIdCounter = 1;

// test registering schemas in subject2
for (int i = 0; i < schemasInSubject3; i++) {
String schema = allSchemasInSubject3.get(i);
int expectedVersion = i + 1;
registerAndVerifySchema(restApp.restClient, schema, schemaIdCounter,
subject3);
schemaIdCounter++;
allVersionsInSubject3.add(expectedVersion);
}

// test getAllVersions with existing data
assertEquals("Getting all versions from subject1 should match all registered versions",
allVersionsInSubject1,
Expand All @@ -113,7 +130,7 @@ public void testQualifiedSubjects() throws Exception {

// test getAllContexts
assertEquals("Getting all contexts should return all registered contexts",
ImmutableList.of(DEFAULT_CONTEXT, ".ctx1", ".ctx2"),
ImmutableList.of(DEFAULT_CONTEXT, ".ctx1", ".ctx2", ".ctx3"),
restApp.restClient.getAllContexts());

// test getAllSubjectsWithPrefix with existing data
Expand All @@ -133,14 +150,24 @@ public void testQualifiedSubjects() throws Exception {

// test getAllSubjectsWithPrefix with context wildcard
assertEquals("Getting all subjects should match all registered subjects",
ImmutableList.of(subject1, subject2),
ImmutableList.of(subject1, subject2, subject3),
restApp.restClient.getAllSubjects(":*:", false));

// test getSchemas with context wildcard
assertEquals("Getting all schemas should match all registered subjects",
schemasInSubject1 + schemasInSubject2,
schemasInSubject1 + schemasInSubject2 + schemasInSubject3,
restApp.restClient.getSchemas(":*:", false, false).size());

// test getSchemas with context wildcard and subject
assertEquals("Getting all schemas should match registered subjects",
schemasInSubject1 + schemasInSubject3,
restApp.restClient.getSchemas(":*:testTopic1", false, false).size());

// test getSchemas with context wildcard and subject
assertEquals("Getting all schemas should match registered subjects",
schemasInSubject2,
restApp.restClient.getSchemas(":*:testTopic2", false, false).size());

Schema schema = restApp.restClient.getVersion("testTopic1", 1);
assertEquals("Getting schema by version w/o context should succeed",
1,
Expand Down

0 comments on commit be9630b

Please sign in to comment.