Skip to content

Commit

Permalink
chore: bring-back lost Schema Registry fixes (#10005)
Browse files Browse the repository at this point in the history
Co-authored-by: Victoria Xia <victoria.xia@confluent.io>
Co-authored-by: asaeedi <asaeedi@confluent.io>
Co-authored-by: Alieh <107070585+aliehsaeedii@users.noreply.github.com>
Co-authored-by: Hao Li <1127478+lihaosky@users.noreply.github.com>
fix tests for updated Schema Registry client internals (#9982) (#10003)
fix: implement default method for schema registry client (#10000)
  • Loading branch information
5 people committed Jul 12, 2023
1 parent 731d485 commit 4d74062
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 54 deletions.
Expand Up @@ -22,6 +22,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -67,6 +68,11 @@ public Ticker ticker() {
throw new UnsupportedOperationException();
}

@Override
public String tenant() {
throw new UnsupportedOperationException();
}

@Override
public Optional<ParsedSchema> parseSchema(
final String schemaType,
Expand Down Expand Up @@ -96,6 +102,13 @@ public int register(
return -1; // swallow
}

@Override
public RegisterSchemaResponse registerWithResponse(
final String subject, final ParsedSchema schema, final boolean normalize)
throws IOException, RestClientException {
return sandboxCacheClient.registerWithResponse(subject, schema, normalize);
}

@Deprecated
@Override
public Schema getById(final int id) {
Expand Down
Expand Up @@ -17,6 +17,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
Expand All @@ -29,6 +30,7 @@
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.test.util.TestMethods;
import io.confluent.ksql.test.util.TestMethods.TestCase;
Expand Down Expand Up @@ -64,6 +66,7 @@ public static Collection<TestCase<SchemaRegistryClient>> getMethodsToTest() {
.ignore("register", String.class, Schema.class, int.class, int.class)
.ignore("register", String.class, ParsedSchema.class, int.class, int.class)
.ignore("getLatestSchemaMetadata", String.class)
.ignore("registerWithResponse", String.class, ParsedSchema.class, boolean.class)
.ignore("getSchemaBySubjectAndId", String.class, int.class)
.ignore("testCompatibility", String.class, Schema.class)
.ignore("testCompatibility", String.class, ParsedSchema.class)
Expand Down Expand Up @@ -186,6 +189,17 @@ public void shouldGetVersion() throws Exception {
assertThat(version, is(6));
}

@Test
public void shouldRegisterWithResponse() throws Exception {
// When:
final RegisterSchemaResponse response = sandboxedClient
.registerWithResponse("some subject", schema, false);

// Then:
assertThat(response, is(notNullValue()));
assertThat(response.getId(), is(1));
}

@Test
public void shouldGetSchemaById() throws Exception {
// Given:
Expand Down
@@ -1,28 +1,43 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.confluent.ksql.test.serde.json;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import java.io.IOException;
import java.math.BigDecimal;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class ValueSpecJsonSchemaSerdeSupplierTest {
@Mock

private SchemaRegistryClient srClient;

@Before
public void setUp() {
srClient = new MockSchemaRegistryClient();
}

@Test
public void shouldSerializeAndDeserializeDecimalsWithOutStrippingTrailingZeros() throws RestClientException, IOException {
// Given:
Expand All @@ -31,32 +46,29 @@ public void shouldSerializeAndDeserializeDecimalsWithOutStrippingTrailingZeros()
final Serializer<Object> serializer = srSerde.getSerializer(srClient, false);
final Deserializer<Object> deserializer = srSerde.getDeserializer(srClient, false);

when(srClient.getLatestSchemaMetadata("t-value"))
.thenReturn(new SchemaMetadata(0, 1, ""));
when(srClient.getSchemaBySubjectAndId("t-value", 0))
.thenReturn(new JsonSchema("{\n" +
" \"properties\": {\n" +
" \"B\": {\n" +
" \"connect.index\": 0,\n" +
" \"oneOf\": [\n" +
" {\n" +
" \"type\": \"null\"\n" +
" },\n" +
" {\n" +
" \"connect.parameters\": {\n" +
" \"connect.decimal.precision\": \"3\",\n" +
" \"scale\": \"1\"\n" +
" },\n" +
" \"connect.type\": \"bytes\",\n" +
" \"connect.version\": 1,\n" +
" \"title\": \"org.apache.kafka.connect.data.Decimal\",\n" +
" \"type\": \"number\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"type\": \"object\"\n" +
"}"));
srClient.register("t-value", new JsonSchema("{\n" +
" \"properties\": {\n" +
" \"B\": {\n" +
" \"connect.index\": 0,\n" +
" \"oneOf\": [\n" +
" {\n" +
" \"type\": \"null\"\n" +
" },\n" +
" {\n" +
" \"connect.parameters\": {\n" +
" \"connect.decimal.precision\": \"3\",\n" +
" \"scale\": \"1\"\n" +
" },\n" +
" \"connect.type\": \"bytes\",\n" +
" \"connect.version\": 1,\n" +
" \"title\": \"org.apache.kafka.connect.data.Decimal\",\n" +
" \"type\": \"number\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"type\": \"object\"\n" +
"}"));

// When:
final byte[] bytes = serializer.serialize("t",
Expand Down
Expand Up @@ -28,17 +28,16 @@
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Message;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.connect.protobuf.ProtobufData;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
Expand Down Expand Up @@ -1228,9 +1227,7 @@ private static Serializer<Object> avroSerializer() {
final Map<String, String> props = new HashMap<>();
props.put("schema.registry.url", "localhost:9092");

final SchemaRegistryClient schemaRegistryClient = mock(SchemaRegistryClient.class);
when(schemaRegistryClient.ticker()).thenReturn(Ticker.systemTicker());
return new KafkaAvroSerializer(schemaRegistryClient, props);
return new KafkaAvroSerializer(new MockSchemaRegistryClient(), props);
}

private static Message protobufRecord() {
Expand All @@ -1250,18 +1247,14 @@ private static Serializer<Message> protobufSerializer() {
final Map<String, String> props = new HashMap<>();
props.put("schema.registry.url", "localhost:9092");

final SchemaRegistryClient schemaRegistryClient = mock(SchemaRegistryClient.class);
when(schemaRegistryClient.ticker()).thenReturn(Ticker.systemTicker());
return new KafkaProtobufSerializer<>(schemaRegistryClient, props);
return new KafkaProtobufSerializer<>(new MockSchemaRegistryClient(), props);
}

private static Serializer<Object> jsonSrSerializer() {
final Map<String, String> props = new HashMap<>();
props.put("schema.registry.url", "localhost:9092");

final SchemaRegistryClient schemaRegistryClient = mock(SchemaRegistryClient.class);
when(schemaRegistryClient.ticker()).thenReturn(Ticker.systemTicker());
return new KafkaJsonSchemaSerializer<>(schemaRegistryClient, props);
return new KafkaJsonSchemaSerializer<>(new MockSchemaRegistryClient(), props);
}
}

Expand Down
Expand Up @@ -3,14 +3,11 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.serde.connect.ConnectKsqlSchemaTranslator;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -85,8 +82,8 @@ public class KsqlJsonSchemaDeserializerTest {
public void before() throws Exception {
schema = (new JsonSchemaTranslator()).fromConnectSchema(ORDER_SCHEMA.schema());

schemaRegistryClient = mock(SchemaRegistryClient.class);
when(schemaRegistryClient.getSchemaBySubjectAndId(anyString(), anyInt())).thenReturn(schema);
schemaRegistryClient = new MockSchemaRegistryClient();
schemaRegistryClient.register(SOME_TOPIC, schema);

final KsqlJsonSerdeFactory jsonSerdeFactory =
new KsqlJsonSerdeFactory(new JsonSchemaProperties(ImmutableMap.of()));
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Expand Up @@ -143,17 +143,17 @@
<git-commit-id-plugin.version>4.9.10</git-commit-id-plugin.version>
<apache.io.version>2.7</apache.io.version>
<io.confluent.ksql.version>7.5.0-0</io.confluent.ksql.version>
<io.confluent.schema-registry.version>7.5.0-811</io.confluent.schema-registry.version>
<io.confluent.schema-registry.version>7.5.0-847</io.confluent.schema-registry.version>
<netty-tcnative-version>2.0.54.Final</netty-tcnative-version>
<!-- We normally get this from common, but Vertx is built against this -->
<!-- Note: `netty` depends on `tcnative` and if we bump `netty`
we might need to bump `tcnative`, too.
Please check top level `pom.xml` at https://github.com/netty/netty
for the netty version we bump to (ie, corresponding git tag),
to find the correct `tcnative` version. -->
<netty.version>4.1.87.Final</netty.version>
<netty-codec-http2-version>4.1.87.Final</netty-codec-http2-version>
<jersey-common>2.34</jersey-common>
<netty.version>4.1.89.Final</netty.version>
<netty-codec-http2-version>4.1.89.Final</netty-codec-http2-version>
<jersey-common>2.39.1</jersey-common>
<multi-threaded-testing.forkCount>3</multi-threaded-testing.forkCount>
<kafka.version>${ce.kafka.version}</kafka.version>
</properties>
Expand Down

0 comments on commit 4d74062

Please sign in to comment.