Skip to content

Commit

Permalink
fix: register schema within sandbox (#8614)
Browse files Browse the repository at this point in the history
If multiple statements are executed "at once", ie, submitted via an input file, earlier statements may create new schemas that later statements depend on. During the sandbox execution, we don't register new schema in SR and thus dependent statement fail inside the sandbox as those schema are not available to them.

This fix adds a schema-cache inside to sandbox to capture new schemas to make them available to dependent statements inside the sandbox.

Co-authored-by: Victoria Xia <victoria.f.xia281@gmail.com>

Closes #1394
  • Loading branch information
mjsax committed Feb 3, 2022
1 parent d90d6e5 commit ba572e0
Show file tree
Hide file tree
Showing 5 changed files with 443 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@

package io.confluent.ksql.services;

import static io.confluent.ksql.util.LimitedProxyBuilder.anyParams;
import static io.confluent.ksql.util.LimitedProxyBuilder.methodParams;

import com.google.common.collect.ImmutableSet;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.util.LimitedProxyBuilder;
import java.util.Collections;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.hc.core5.http.HttpStatus;

/**
* SchemaRegistryClient used when trying out operations.
Expand All @@ -36,21 +43,223 @@ final class SandboxedSchemaRegistryClient {

static SchemaRegistryClient createProxy(final SchemaRegistryClient delegate) {
Objects.requireNonNull(delegate, "delegate");

return LimitedProxyBuilder.forClass(SchemaRegistryClient.class)
.swallow("register", anyParams(), 123)
.swallow("getId", anyParams(), 123)
.forward("getAllSubjects", methodParams(), delegate)
.forward("getSchemaById", methodParams(int.class), delegate)
.forward("getLatestSchemaMetadata", methodParams(String.class), delegate)
.forward("getSchemaBySubjectAndId", methodParams(String.class, int.class), delegate)
.forward("testCompatibility",
methodParams(String.class, ParsedSchema.class), delegate)
.swallow("deleteSubject", methodParams(String.class), Collections.emptyList())
.forward("getVersion", methodParams(String.class, ParsedSchema.class), delegate)
.build();
return new SandboxSchemaRegistryCache(delegate);
}

private SandboxedSchemaRegistryClient() {
}

static final class SandboxSchemaRegistryCache implements SchemaRegistryClient {
// we use `MockSchemaRegistryClient` as a cache inside the sandbox to store
// newly registered schemas (without polluting the actual SR)
// this allows dependent statements to execute successfully inside the sandbox
private final MockSchemaRegistryClient sandboxCacheClient = new MockSchemaRegistryClient();
// client to talk to the actual SR
private final SchemaRegistryClient srClient;

private SandboxSchemaRegistryCache(final SchemaRegistryClient delegate) {
this.srClient = delegate;
}

@Override
public Optional<ParsedSchema> parseSchema(
final String schemaType,
final String schemaString,
final List<SchemaReference> references) {
throw new UnsupportedOperationException();
}

@Override
public int register(final String subject, final ParsedSchema parsedSchema)
throws RestClientException, IOException {
return sandboxCacheClient.register(subject, parsedSchema);
}

@Override
public int register(final String subject, final ParsedSchema schema, final boolean normalize)
throws RestClientException, IOException {
return sandboxCacheClient.register(subject, schema, normalize);
}

@Override
public int register(
final String subject,
final ParsedSchema parsedSchema,
final int version,
final int id) {
return -1; // swallow
}

@Deprecated
@Override
public Schema getById(final int id) {
throw new UnsupportedOperationException();

}

@Override
public ParsedSchema getSchemaById(final int id) throws RestClientException, IOException {
try {
return srClient.getSchemaById(id);
} catch (RestClientException e) {
// if we don't find the schema in SR, we try to get it from the sandbox cache
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) {
return sandboxCacheClient.getSchemaById(id);
}
throw e;
}
}

@Deprecated
@Override
public Schema getBySubjectAndId(final String subject, final int id) {
throw new UnsupportedOperationException();
}

@Override
public ParsedSchema getSchemaBySubjectAndId(final String subject, final int id)
throws RestClientException, IOException {

try {
return srClient.getSchemaBySubjectAndId(subject, id);
} catch (final RestClientException e) {
// if we don't find the schema in SR, we try to get it from the sandbox cache
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) {
return sandboxCacheClient.getSchemaBySubjectAndId(subject, id);
}
throw e;
}
}

@Override
public Collection<String> getAllSubjectsById(final int id) {
throw new UnsupportedOperationException();
}

@Override
public SchemaMetadata getLatestSchemaMetadata(final String subject)
throws RestClientException, IOException {

try {
return srClient.getLatestSchemaMetadata(subject);
} catch (final RestClientException e) {
// if we don't find the schema metadata in SR, we try to get it from the sandbox cache
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) {
return sandboxCacheClient.getLatestSchemaMetadata(subject);
}
throw e;
}
}

@Override
public SchemaMetadata getSchemaMetadata(final String subject, final int version) {
throw new UnsupportedOperationException();
}

@Deprecated
@Override
public int getVersion(final String subject, final Schema parsedSchema)
throws RestClientException, IOException {
throw new UnsupportedOperationException();
}

@Override
public int getVersion(final String subject, final ParsedSchema parsedSchema)
throws RestClientException, IOException {
try {
return srClient.getVersion(subject, parsedSchema);
} catch (final RestClientException e) {
// if we don't find the version in SR, we try to get it from the sandbox cache
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) {
return sandboxCacheClient.getVersion(subject, parsedSchema);
}
throw e;
}
}

@Override
public List<Integer> getAllVersions(final String subject) {
throw new UnsupportedOperationException();
}

@Override
public boolean testCompatibility(final String subject, final ParsedSchema parsedSchema)
throws RestClientException, IOException {
return srClient.testCompatibility(subject, parsedSchema)
&& sandboxCacheClient.testCompatibility(subject, parsedSchema);
}

@Override
public String updateCompatibility(final String subject, final String compatibility) {
throw new UnsupportedOperationException();
}

@Override
public String getCompatibility(final String subject) {
throw new UnsupportedOperationException();
}

@Override
public String setMode(final String mode) {
throw new UnsupportedOperationException();
}

@Override
public String setMode(final String mode, final String subject) {
throw new UnsupportedOperationException();
}

@Override
public String getMode() {
throw new UnsupportedOperationException();
}

@Override
public String getMode(final String subject) {
throw new UnsupportedOperationException();
}

@Override
public List<Integer> deleteSubject(final String subject)
throws IOException, RestClientException {
return null; // swallow
}

@Override
public List<Integer> deleteSubject(final String subject, final boolean isPermanent) {
throw new UnsupportedOperationException();
}

@Override
public Collection<String> getAllSubjects() throws RestClientException, IOException {
final Collection<String> allSubjects = new HashSet<>(srClient.getAllSubjects());
allSubjects.addAll(sandboxCacheClient.getAllSubjects());
return ImmutableSet.copyOf(allSubjects);
}

@Override
public int getId(final String subject, final ParsedSchema schema, final boolean normalize)
throws IOException, RestClientException {
return getId(subject, schema);
}

@Override
public int getId(final String subject, final ParsedSchema parsedSchema)
throws RestClientException, IOException {
try {
return srClient.getId(subject, parsedSchema);
} catch (final RestClientException e) {
// if we don't find the schema in SR, we try to get it from the sandbox cache
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) {
return sandboxCacheClient.getId(subject, parsedSchema);
}
throw e;
}
}

@Override
public void reset() {
throw new UnsupportedOperationException();
}
}
}

0 comments on commit ba572e0

Please sign in to comment.