From ba572e0e5bd8873e538d40a58153ba345eb1573d Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 2 Feb 2022 19:00:40 -0800 Subject: [PATCH] fix: register schema within sandbox (#8614) If multiple statements are executed "at once", ie, submitted via an input file, earlier statements may create new schemas that later statements depend on. During the sandbox execution, we don't register new schema in SR and thus dependent statement fail inside the sandbox as those schema are not available to them. This fix adds a schema-cache inside to sandbox to capture new schemas to make them available to dependent statements inside the sandbox. Co-authored-by: Victoria Xia Closes #1394 --- .../SandboxedSchemaRegistryClient.java | 245 ++++++++++++++++-- .../DependentStatementsIntegrationTest.java | 182 +++++++++++++ .../integration/EndToEndIntegrationTest.java | 5 +- .../SandboxedSchemaRegistryClientTest.java | 33 ++- .../services/SandboxedServiceContextTest.java | 3 +- 5 files changed, 443 insertions(+), 25 deletions(-) create mode 100644 ksqldb-engine/src/test/java/io/confluent/ksql/integration/DependentStatementsIntegrationTest.java diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java index a9a7d5c8574a..7a49ecef2df0 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java @@ -15,14 +15,21 @@ package io.confluent.ksql.services; -import static io.confluent.ksql.util.LimitedProxyBuilder.anyParams; -import static io.confluent.ksql.util.LimitedProxyBuilder.methodParams; - +import com.google.common.collect.ImmutableSet; import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.ksql.util.LimitedProxyBuilder; -import java.util.Collections; +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; import java.util.Objects; +import java.util.Optional; +import org.apache.avro.Schema; +import org.apache.hc.core5.http.HttpStatus; /** * SchemaRegistryClient used when trying out operations. @@ -36,21 +43,223 @@ final class SandboxedSchemaRegistryClient { static SchemaRegistryClient createProxy(final SchemaRegistryClient delegate) { Objects.requireNonNull(delegate, "delegate"); - - return LimitedProxyBuilder.forClass(SchemaRegistryClient.class) - .swallow("register", anyParams(), 123) - .swallow("getId", anyParams(), 123) - .forward("getAllSubjects", methodParams(), delegate) - .forward("getSchemaById", methodParams(int.class), delegate) - .forward("getLatestSchemaMetadata", methodParams(String.class), delegate) - .forward("getSchemaBySubjectAndId", methodParams(String.class, int.class), delegate) - .forward("testCompatibility", - methodParams(String.class, ParsedSchema.class), delegate) - .swallow("deleteSubject", methodParams(String.class), Collections.emptyList()) - .forward("getVersion", methodParams(String.class, ParsedSchema.class), delegate) - .build(); + return new SandboxSchemaRegistryCache(delegate); } private SandboxedSchemaRegistryClient() { } + + static final class SandboxSchemaRegistryCache implements SchemaRegistryClient { + // we use `MockSchemaRegistryClient` as a cache inside the sandbox to store + // newly registered schemas (without polluting the actual SR) + // this allows dependent statements to execute successfully inside the sandbox + private final MockSchemaRegistryClient sandboxCacheClient = new MockSchemaRegistryClient(); + // client to talk to the actual SR + private final SchemaRegistryClient srClient; + + private SandboxSchemaRegistryCache(final SchemaRegistryClient delegate) { + this.srClient = delegate; + } + + @Override + public Optional parseSchema( + final String schemaType, + final String schemaString, + final List references) { + throw new UnsupportedOperationException(); + } + + @Override + public int register(final String subject, final ParsedSchema parsedSchema) + throws RestClientException, IOException { + return sandboxCacheClient.register(subject, parsedSchema); + } + + @Override + public int register(final String subject, final ParsedSchema schema, final boolean normalize) + throws RestClientException, IOException { + return sandboxCacheClient.register(subject, schema, normalize); + } + + @Override + public int register( + final String subject, + final ParsedSchema parsedSchema, + final int version, + final int id) { + return -1; // swallow + } + + @Deprecated + @Override + public Schema getById(final int id) { + throw new UnsupportedOperationException(); + + } + + @Override + public ParsedSchema getSchemaById(final int id) throws RestClientException, IOException { + try { + return srClient.getSchemaById(id); + } catch (RestClientException e) { + // if we don't find the schema in SR, we try to get it from the sandbox cache + if (e.getStatus() == HttpStatus.SC_NOT_FOUND) { + return sandboxCacheClient.getSchemaById(id); + } + throw e; + } + } + + @Deprecated + @Override + public Schema getBySubjectAndId(final String subject, final int id) { + throw new UnsupportedOperationException(); + } + + @Override + public ParsedSchema getSchemaBySubjectAndId(final String subject, final int id) + throws RestClientException, IOException { + + try { + return srClient.getSchemaBySubjectAndId(subject, id); + } catch (final RestClientException e) { + // if we don't find the schema in SR, we try to get it from the sandbox cache + if (e.getStatus() == HttpStatus.SC_NOT_FOUND) { + return sandboxCacheClient.getSchemaBySubjectAndId(subject, id); + } + throw e; + } + } + + @Override + public Collection getAllSubjectsById(final int id) { + throw new UnsupportedOperationException(); + } + + @Override + public SchemaMetadata getLatestSchemaMetadata(final String subject) + throws RestClientException, IOException { + + try { + return srClient.getLatestSchemaMetadata(subject); + } catch (final RestClientException e) { + // if we don't find the schema metadata in SR, we try to get it from the sandbox cache + if (e.getStatus() == HttpStatus.SC_NOT_FOUND) { + return sandboxCacheClient.getLatestSchemaMetadata(subject); + } + throw e; + } + } + + @Override + public SchemaMetadata getSchemaMetadata(final String subject, final int version) { + throw new UnsupportedOperationException(); + } + + @Deprecated + @Override + public int getVersion(final String subject, final Schema parsedSchema) + throws RestClientException, IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getVersion(final String subject, final ParsedSchema parsedSchema) + throws RestClientException, IOException { + try { + return srClient.getVersion(subject, parsedSchema); + } catch (final RestClientException e) { + // if we don't find the version in SR, we try to get it from the sandbox cache + if (e.getStatus() == HttpStatus.SC_NOT_FOUND) { + return sandboxCacheClient.getVersion(subject, parsedSchema); + } + throw e; + } + } + + @Override + public List getAllVersions(final String subject) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean testCompatibility(final String subject, final ParsedSchema parsedSchema) + throws RestClientException, IOException { + return srClient.testCompatibility(subject, parsedSchema) + && sandboxCacheClient.testCompatibility(subject, parsedSchema); + } + + @Override + public String updateCompatibility(final String subject, final String compatibility) { + throw new UnsupportedOperationException(); + } + + @Override + public String getCompatibility(final String subject) { + throw new UnsupportedOperationException(); + } + + @Override + public String setMode(final String mode) { + throw new UnsupportedOperationException(); + } + + @Override + public String setMode(final String mode, final String subject) { + throw new UnsupportedOperationException(); + } + + @Override + public String getMode() { + throw new UnsupportedOperationException(); + } + + @Override + public String getMode(final String subject) { + throw new UnsupportedOperationException(); + } + + @Override + public List deleteSubject(final String subject) + throws IOException, RestClientException { + return null; // swallow + } + + @Override + public List deleteSubject(final String subject, final boolean isPermanent) { + throw new UnsupportedOperationException(); + } + + @Override + public Collection getAllSubjects() throws RestClientException, IOException { + final Collection allSubjects = new HashSet<>(srClient.getAllSubjects()); + allSubjects.addAll(sandboxCacheClient.getAllSubjects()); + return ImmutableSet.copyOf(allSubjects); + } + + @Override + public int getId(final String subject, final ParsedSchema schema, final boolean normalize) + throws IOException, RestClientException { + return getId(subject, schema); + } + + @Override + public int getId(final String subject, final ParsedSchema parsedSchema) + throws RestClientException, IOException { + try { + return srClient.getId(subject, parsedSchema); + } catch (final RestClientException e) { + // if we don't find the schema in SR, we try to get it from the sandbox cache + if (e.getStatus() == HttpStatus.SC_NOT_FOUND) { + return sandboxCacheClient.getId(subject, parsedSchema); + } + throw e; + } + } + + @Override + public void reset() { + throw new UnsupportedOperationException(); + } + } } diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/DependentStatementsIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/DependentStatementsIntegrationTest.java new file mode 100644 index 000000000000..e3280b085d43 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/DependentStatementsIntegrationTest.java @@ -0,0 +1,182 @@ +/* + * Copyright 2022 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.confluent.ksql.integration; + +import static java.lang.String.format; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.PersistentQueryMetadata; +import io.confluent.ksql.util.QueryMetadata; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.RuleChain; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({IntegrationTest.class}) +public class DependentStatementsIntegrationTest { + + private static final Logger log = LoggerFactory.getLogger(DependentStatementsIntegrationTest.class); + + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + + @ClassRule + public static final RuleChain CLUSTER_WITH_RETRY = RuleChain + .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) + .around(TEST_HARNESS); + + public TestKsqlContext ksqlContext; + + @Rule + public final Timeout timeout = Timeout.seconds(120); + + private final List toClose = new ArrayList<>(); + + @Before + public void before() throws Exception { + TEST_HARNESS.before(); + ksqlContext = TEST_HARNESS.ksqlContextBuilder() + .withAdditionalConfig( + KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY, + "http://foo:8080") + .build(); + + ksqlContext.before(); + + toClose.clear(); + } + + @After + public void after() { + toClose.forEach(QueryMetadata::close); + ksqlContext.after(); + TEST_HARNESS.after(); + } + + @Test + public void shouldCreateDependentTopicWithDefaultReplicationInSandbox() { + executeStatement( + // When: + "CREATE STREAM createNewTopic (a INT, b INT)" + + " WITH (KAFKA_TOPIC='foo', PARTITIONS=1, FORMAT='JSON');" + + // Then: dependent statement also executes successfully + + "CREATE STREAM pickupDefaultReplicationFactor AS SELECT * FROM createNewTopic;" + ); + } + + @Test + public void shouldUseTopicFromDependentStatementInSandbox() { + executeStatement( + // When: + "CREATE STREAM createNewTopic (a INT, b INT)" + + " WITH (KAFKA_TOPIC='bar', PARTITIONS=1, FORMAT='JSON');" + + // Then: dependent statement also executes successfully + + "CREATE STREAM reuseTopic WITH (KAFKA_TOPIC='bar') AS SELECT * FROM createNewTopic;" + ); + } + @Test + + public void shouldRegisterAvroSchemaInSandboxViaCS() { + executeStatement( + // When: + "CREATE STREAM avro_input (a INT KEY, b INT KEY, c VARCHAR, d VARCHAR)" + + " WITH (KAFKA_TOPIC='t2', PARTITIONS=1, FORMAT='AVRO');" + + // Then: dependent statement also executes successfully + + "CREATE STREAM should_infer_schema WITH (KAFKA_TOPIC='t2', FORMAT='AVRO');" + ); + } + + @Test + public void shouldRegisterAvroSchemaInSandboxViaCSAS() { + // Given: + executeStatement( + "CREATE STREAM avro_input (a INT KEY, b INT KEY, c VARCHAR, d VARCHAR)" + + " WITH (KAFKA_TOPIC='t5', PARTITIONS=1, FORMAT='AVRO');" + ); + executeStatement( + // When: + "CREATE STREAM should_register_schema WITH (KAFKA_TOPIC='t6', FORMAT='AVRO') AS" + + " SELECT * FROM avro_input;" + + // Then: dependent statement also executes successfully + + "CREATE STREAM should_infer_schema WITH (KAFKA_TOPIC='t6', FORMAT='AVRO');" + ); + } + + @Test + public void shouldRegisterAvroSchemaInSandboxViaCSandSchemaId() throws Exception { + // Given: + executeStatement( + "CREATE STREAM avro_input (a INT KEY, b INT KEY, c VARCHAR, d VARCHAR)" + + " WITH (KAFKA_TOPIC='t9', PARTITIONS=1, FORMAT='AVRO');" + ); + final SchemaRegistryClient srClient = TEST_HARNESS.getSchemaRegistryClient(); + final int keySchemaId = srClient.getLatestSchemaMetadata("t9-key").getId(); + final int valueSchemaId = srClient.getLatestSchemaMetadata("t9-value").getId(); + + executeStatement( + // When: + "CREATE STREAM should_register_schema WITH (" + + "KAFKA_TOPIC='t10'," + + "FORMAT='AVRO'," + + "KEY_SCHEMA_ID=%s," + + "VALUE_SCHEMA_ID=%s" + + ") AS " + // because ksqlDB always assumes an unwrapped key schema, both columns `a` and `b` + // are inferred as `struct` (cf https://github.com/confluentinc/ksql/issues/8489); + // thus, we need to create a `struct` key with default name `rowkey` + // to get a logical schema that is compatible to the physical schema + + "SELECT Struct(a := a, b := b) AS rowkey, c, d FROM avro_input PARTITION BY Struct(a := a, b := b);" + + // Then: dependent statement also executes successfully + + "CREATE STREAM should_infer_schema WITH (KAFKA_TOPIC='t10', FORMAT='AVRO');", + String.valueOf(keySchemaId), + String.valueOf(valueSchemaId) + ); + } + + private void executeStatement( + final String statement, + final String... args + ) { + final String formatted = format(statement, (Object[])args); + + final List queries = ksqlContext.sql(formatted); + + final List newQueries = queries.stream() + .filter(q -> !(q instanceof PersistentQueryMetadata)) + .collect(Collectors.toList()); + + newQueries.forEach(QueryMetadata::start); + + toClose.addAll(newQueries); + } + +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java index 8b88dae428a0..93d171ed7cf1 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java @@ -351,7 +351,6 @@ private T executeStatement( final String... args ) { final String formatted = format(statement, (Object[])args); - log.debug("Sending statement: {}", formatted); final List queries = ksqlContext.sql(formatted); @@ -368,14 +367,14 @@ private T executeStatement( private static List waitForFirstRow( final TransientQueryMetadata queryMetadata - ) throws Exception { + ) { return verifyAvailableRows(queryMetadata, 1).get(0).values(); } private static List verifyAvailableRows( final TransientQueryMetadata queryMetadata, final int expectedRows - ) throws Exception { + ) { final BlockingRowQueue rowQueue = queryMetadata.getRowQueue(); assertThatEventually( diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedSchemaRegistryClientTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedSchemaRegistryClientTest.java index 7b099bc98c22..7d6bcbf70ff7 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedSchemaRegistryClientTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedSchemaRegistryClientTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -28,11 +29,13 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.ksql.test.util.TestMethods; import io.confluent.ksql.test.util.TestMethods.TestCase; import java.util.Collection; import java.util.Objects; import org.apache.avro.Schema; +import org.apache.hc.core5.http.HttpStatus; import org.junit.Before; import org.junit.Test; import org.junit.experimental.runners.Enclosed; @@ -83,7 +86,8 @@ public UnsupportedMethods(final TestCase testCase) { @Before public void setUp() { - sandboxedSchemaRegistryClient = SandboxedSchemaRegistryClient.createProxy(mock(SchemaRegistryClient.class)); + sandboxedSchemaRegistryClient = SandboxedSchemaRegistryClient.createProxy( + mock(SchemaRegistryClient.class)); } @Test(expected = UnsupportedOperationException.class) @@ -196,6 +200,12 @@ public void shouldGetSchemaById() throws Exception { @Test public void shouldGetId() throws Exception { + // Given: + when(delegate.getId(anyString(), any(ParsedSchema.class))) + .thenReturn(123) + .thenReturn(124) + .thenReturn(125); // we swallow the third boolean argument in the implementation + // When: final int id = sandboxedClient.getId("some subject", schema); final int id1 = sandboxedClient.getId("some subject", parsedSchema); @@ -203,8 +213,25 @@ public void shouldGetId() throws Exception { // Then: assertThat(id, is(123)); - assertThat(id1, is(123)); - assertThat(id2, is(123)); + assertThat(id1, is(124)); + assertThat(id2, is(125)); + } + + @Test + public void shouldGetIdFromCache() throws Exception { + // Given: + final RestClientException exception = mock(RestClientException.class); + when(exception.getStatus()).thenReturn(HttpStatus.SC_NOT_FOUND); + when(delegate.getId(anyString(), any(ParsedSchema.class))).thenThrow(exception); + + final int newId = sandboxedClient.register("newSubject", parsedSchema); + + // When: + final int id = sandboxedClient.getId("newSubject", parsedSchema); + + // Then: + assertThat(id, is(newId)); + } } } \ No newline at end of file diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java index c979eb9998d0..932f341fba0c 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedServiceContextTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.services.SandboxedSchemaRegistryClient.SandboxSchemaRegistryCache; import io.confluent.ksql.test.util.TestMethods; import io.confluent.ksql.test.util.TestMethods.TestCase; import java.lang.reflect.Proxy; @@ -151,7 +152,7 @@ public void shouldGetSandboxedSchemaRegistryClient() throws Exception { final SchemaRegistryClient actual = sandboxedServiceContext.getSchemaRegistryClient(); // Then: - assertThat(Proxy.isProxyClass(actual.getClass()), is(true)); + assertThat(actual, instanceOf(SandboxSchemaRegistryCache.class)); // When: actual.getLatestSchemaMetadata("some subject");