-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: register schema within sandbox #8614
Conversation
|
||
return LimitedProxyBuilder.forClass(SchemaRegistryClient.class) | ||
.swallow("register", anyParams(), 123) | ||
.forward("register", methodParams(String.class, ParsedSchema.class), sandboxSchemaRegistryCache) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the "main" fix -- instead of swallowing the registration, we capture it within the sandbox.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how are we planning to test this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working on tests... The PR is still WIP. Just opened if for early feedback.
final ParsedSchema parsedSchema, | ||
final int version, | ||
final int id) { | ||
return -1; // swallow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this swallowed whereas previous register
does actual work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Old code swallows both: .swallow("register", anyParams(), 123)
I don't think ksqlDB actually uses this method? I just added it to make the existing unit tests pass.
@Override | ||
public int register(final String subject, final ParsedSchema parsedSchema) { | ||
if (subjectCache.containsKey(subject)) { | ||
throw new IllegalStateException("Subject '" + subject + "' already in use."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this throw instead of returning existing id for the subject which is the behavior of schema registry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it not depend on the compatibility rules if updating the schema actually works? Not sure to what extend we need to fully mimic SR logic? -- I tired to keep it simple in the hope it would be sufficient. Not totally sure to be fair. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will make the following fail because they share same subject?
CREATE test_stream_1 (id int key, name varchar) with (kafka_topic='some_topic', partitions=1, format='avro')
CREATE test_stream_2 (id int key, name varchar, age int) with (kafka_topic='some_topic', partitions=1, format='avro')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should err on the side of false positives rather than false negatives. It'd be bad to have statements that actually work fail validation because then we might be introducing a regression (if a customer uses such statements in their workflows). In contrast, a false positive means we let some statements through validation that then fail in execution. Not great since multi-statements requests might have been partially executed, but also doesn't prevent any existing workflows.
final int schemaId = nextSchemaId--; | ||
subjectCache.put(subject, parsedSchema); | ||
subjectToId.put(subject, schemaId); | ||
idCache.put(schemaId, parsedSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use MockSchemaRegistryClient
? https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClient.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe. What would we gain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Make maintaining the 3 caches here easier and less error prone?
MockSchemaRegistryClient
seems to have similar logic inside. - Make behavior same as prod schema registry. For example,
MockSchemaRegistryClient
could have register same subject differently instead of throwing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 if we can use the MockSchemaRegistryClient rather than maintaining our own caches for the newly registered subjects, that'd be nice. OTOH if it's a lot of effort I think the current approach is fine too. Let's just be careful about not throwing errors where we shouldn't be (see above).
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) { | ||
final ParsedSchema schema = idCache.get(id); | ||
if (schema != null) { | ||
return schema; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put some comment why we do this?
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) { | ||
final ParsedSchema schemaByName = subjectCache.get(subject); | ||
final ParsedSchema schemaById = idCache.get(id); | ||
if (schemaByName != null && schemaByName == schemaById) { | ||
return schemaByName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
if (e.getStatus() == HttpStatus.SC_NOT_FOUND && subjectToId.containsKey(subject)) { | ||
return subjectToId.get(subject); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax -- great test coverage! Comments inline.
@Override | ||
public int register(final String subject, final ParsedSchema parsedSchema) { | ||
if (subjectCache.containsKey(subject)) { | ||
throw new IllegalStateException("Subject '" + subject + "' already in use."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should err on the side of false positives rather than false negatives. It'd be bad to have statements that actually work fail validation because then we might be introducing a regression (if a customer uses such statements in their workflows). In contrast, a false positive means we let some statements through validation that then fail in execution. Not great since multi-statements requests might have been partially executed, but also doesn't prevent any existing workflows.
final int schemaId = nextSchemaId--; | ||
subjectCache.put(subject, parsedSchema); | ||
subjectToId.put(subject, schemaId); | ||
idCache.put(schemaId, parsedSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 if we can use the MockSchemaRegistryClient rather than maintaining our own caches for the newly registered subjects, that'd be nice. OTOH if it's a lot of effort I think the current approach is fine too. Let's just be careful about not throwing errors where we shouldn't be (see above).
try { | ||
return delegate.getSchemaById(id); | ||
} catch (RestClientException e) { | ||
// if we don't find the schema is SR, we try to get it from the sandbox cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo. Also three more occurrences of the same typo in this file.
// if we don't find the schema is SR, we try to get it from the sandbox cache | |
// if we don't find the schema in SR, we try to get it from the sandbox cache |
try { | ||
return delegate.getLatestSchemaMetadata(subject); | ||
} catch (final RestClientException e) { | ||
// if we don't find the schema metadata is SR, but there subject is registered inside |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typos
// if we don't find the schema metadata is SR, but there subject is registered inside | |
// if we don't find the schema metadata in SR, but the subject is registered inside |
@@ -0,0 +1,238 @@ | |||
/* | |||
* Copyright 2018 Confluent Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: year
"http://foo:8080") | ||
.withAdditionalConfig( | ||
KsqlConfig.KSQL_SHARED_RUNTIME_ENABLED, | ||
sharedRuntimes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we believe that shared runtimes might affect the behavior tested in this integration test? (Do we really need to test both versions of this?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just blindly copied from EndToEndIntegrationTest
-- not even sure that the difference is. Happy to remove it.
"CREATE STREAM avro_input (a INT KEY, b VARCHAR)" | ||
+ " WITH (KAFKA_TOPIC='t1', PARTITIONS=1, FORMAT='AVRO', WRAP_SINGLE_VALUE=false);" | ||
|
||
// Then: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This usage of "When"/"Then" is different from how the pattern is used in other tests in the repo. "Then" is typically used for validations, but here the only validation is that the combination of statements successfully executes. Maybe we remove the "When"/"Then" and just add a comment like:
// Then: | |
// dependent statement also executes successfully |
?
} | ||
|
||
@Test | ||
public void shouldRegisterAvroSchemaInSandboxViaCS() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we think primitive vs non-primitive schemas might be handled differently? I think we can cut the number of tests in half by only testing one of them.
|
||
@RunWith(Parameterized.class) | ||
@Category({IntegrationTest.class}) | ||
public class DependentStatementsIntegrationTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great test coverage! Out of curiosity, did we ever add the analogous tests for statements with topic dependencies? Would be nice to do that if not (can be separate from this PR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be covered (cf #4800)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR that fixed that issue only added unit test coverage, and no integration test coverage. I meant to say that it'd be nice to add an integration test into this new DependentStatementsIntegrationTest.java file for the topic dependency case too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall! Some nits and please take a look at test failure.
final String... args | ||
) { | ||
final String formatted = format(statement, (Object[])args); | ||
log.debug("Sending statement: {}", formatted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove logging in test?
ksqldb-engine/src/test/java/io/confluent/ksql/services/SandboxedSchemaRegistryClientTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax ! LGTM with some minor comments/questions inline.
} | ||
|
||
private SandboxedSchemaRegistryClient() { | ||
} | ||
|
||
static final class SandboxSchemaRegistryCache implements SchemaRegistryClient { | ||
private final MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add comments explaining the difference between delegate
and mockedClient
here, i.e., what each is used for/represents? Without this context, this code is very difficult to understand.
We might also want to rename mockedClient
to sandboxCacheClient
or similar, in order to clarify within the code itself as well.
} catch (RestClientException e) { | ||
// if we don't find the schema in SR, we try to get it from the sandbox cache | ||
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) { | ||
return mockedClient.getSchemaById(id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The earlier version of this PR only returned from this line if the schema was present in the sandbox cache, and otherwise threw on the line below. This latest version of the PR now always returns from the mocked client representing the sandbox cache. Does the mocked client throw an error if the schema is not present? Otherwise there is a behavior change here (which might be fine, wondering if it's intentional).
Same comment/question for the other methods below too (getSchemaBySubjectAndId
, getLatestSchemaMetadata
, getVersion
, and getId
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
@RunWith(Parameterized.class) | ||
@Category({IntegrationTest.class}) | ||
public class DependentStatementsIntegrationTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR that fixed that issue only added unit test coverage, and no integration test coverage. I meant to say that it'd be nice to add an integration test into this new DependentStatementsIntegrationTest.java file for the topic dependency case too.
Test failed a second time, but it passes locally:
Any idea? Let's see if it fails again in the next build. |
Is
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/services/SandboxedSchemaRegistryClient.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void shouldCreateDependentTopicWithDefaultReplicationInSandbox() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test doesn't actually test a topic dependency, does it? I thought the dependency issue fixed in the previous PR was if the first CREATE STREAM creates a topic and then the second one uses the newly created topic. (Second statement should reference the topic, not the stream.)
This dependency case (CS followed by CSAS) should already be tested in other integration tests throughout the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the fix was about "inherit the replication factor". The second statement creates an output topic, and was not able to create it, because it did neither inherit the replication factor from upstream topic, nor did if fetch default replication factor from Kafka.
Happy to remove this test again -- I thought you asked to add it. I can also add a different one that use two CS statements, but it also seems redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, huh! I totally misunderstood the original ticket for that issue then, haha. If this integration test covers the fix, then that sounds great to me. Thanks for adding it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mjsax ! LGTM.
Another (different) test failure:
Failed locally, too. Is this related to broken |
…edSchemaRegistryClient.java Co-authored-by: Victoria Xia <victoria.f.xia281@gmail.com>
…edSchemaRegistryClient.java Co-authored-by: Victoria Xia <victoria.f.xia281@gmail.com>
Closes #1394