Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-3542 Use Apicurio 2.0 #2458

Merged
merged 1 commit into from Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion debezium-bom/pom.xml
Expand Up @@ -14,7 +14,7 @@
<properties>
<!-- check new release version at https://github.com/confluentinc/schema-registry/releases -->
<version.confluent.platform>6.0.2</version.confluent.platform>
<version.apicurio>1.3.2.Final</version.apicurio>
<version.apicurio>2.0.0.Final</version.apicurio>

<!-- Database drivers, should align with databases in debezium-build-parent -->
<version.postgresql.driver>42.2.14</version.postgresql.driver>
Expand Down
Expand Up @@ -21,9 +21,8 @@ public Map<String, String> start() {

Map<String, String> params = new ConcurrentHashMap<>();
params.put("debezium.format.apicurio.registry.url", getApicurioUrl());
params.put("debezium.format.apicurio.registry.converter.serializer", "io.apicurio.registry.utils.serde.AvroKafkaSerializer");
params.put("debezium.format.apicurio.registry.converter.deserializer", "io.apicurio.registry.utils.serde.AvroKafkaDeserializer");
params.put("debezium.format.apicurio.registry.global-id", "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy");
params.put("debezium.format.apicurio.registry.auto-register", "true");
params.put("debezium.format.apicurio.registry.find-latest", "true");
return params;
}

Expand All @@ -40,6 +39,6 @@ public void stop() {
}

public static String getApicurioUrl() {
return "http://" + container.getHost() + ":" + container.getMappedPort(APICURIO_PORT) + "/api";
return "http://" + container.getHost() + ":" + container.getMappedPort(APICURIO_PORT) + "/apis/registry/v2";
}
}
Expand Up @@ -57,7 +57,9 @@ public DebeziumContainer(final String containerImageName) {
}

public static DebeziumContainer latestStable() {
return new DebeziumContainer("debezium/connect:" + DEBEZIUM_VERSION);
// Temporarily requires hard-coded 1.6 as there is no stable release with Apicurio 2.0 baked
// return new DebeziumContainer("debezium/connect:" + DEBEZIUM_VERSION);
return new DebeziumContainer("debezium/connect:" + "1.6-devel");
}

private void defaultConfig() {
Expand Down
Expand Up @@ -153,9 +153,7 @@ public void shouldConvertToCloudEventWithDataAsAvro() throws Exception {
statement.execute("alter table todo.Todo replica identity full");
statement.execute("insert into todo.Todo values (3, 'Be Awesome')");

final String host = apicurioContainer.getContainerInfo().getConfig().getHostName();
final int port = apicurioContainer.getExposedPorts().get(0);
final String apicurioUrl = "http://" + host + ":" + port + "/api";
final String apicurioUrl = getApicurioUrl();
String id = "3";

// host, database, user etc. are obtained from the container
Expand All @@ -166,7 +164,8 @@ public void shouldConvertToCloudEventWithDataAsAvro() throws Exception {
.with("value.converter", "io.debezium.converters.CloudEventsConverter")
.with("value.converter.data.serializer.type", "avro")
.with("value.converter.avro.apicurio.registry.url", apicurioUrl)
.with("value.converter.avro.apicurio.registry.global-id", "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy");
.with("value.converter.avro.apicurio.registry.auto-register", "true")
.with("value.converter.avro.apicurio.registry.find-latest", "true");

debeziumContainer.registerConnector("my-connector-cloudevents-avro", config);

Expand Down Expand Up @@ -228,20 +227,20 @@ private <T> List<ConsumerRecord<T, T>> drain(KafkaConsumer<T, T> consumer, int e
}

private ConnectorConfiguration getConfiguration(int id, String converter, String... options) {
final String host = apicurioContainer.getContainerInfo().getConfig().getHostName();
final int port = apicurioContainer.getExposedPorts().get(0);
final String apicurioUrl = "http://" + host + ":" + port + "/api";
final String apicurioUrl = getApicurioUrl();

// host, database, user etc. are obtained from the container
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer)
.with("database.server.name", "dbserver" + id)
.with("slot.name", "debezium_" + id)
.with("key.converter", converter)
.with("key.converter.apicurio.registry.url", apicurioUrl)
.with("key.converter.apicurio.registry.global-id", "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy")
.with("key.converter.apicurio.registry.auto-register", "true")
.with("key.converter.apicurio.registry.find-latest", "true")
.with("value.converter.apicurio.registry.url", apicurioUrl)
.with("value.converter", converter)
.with("value.converter.apicurio.registry.global-id", "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy");
.with("value.converter.apicurio.registry.auto-register", "true")
.with("value.converter.apicurio.registry.find-latest", "true");

if (options != null && options.length > 0) {
for (int i = 0; i < options.length; i += 2) {
Expand All @@ -251,6 +250,13 @@ private ConnectorConfiguration getConfiguration(int id, String converter, String
return config;
}

private String getApicurioUrl() {
final String host = apicurioContainer.getContainerInfo().getConfig().getHostName();
final int port = apicurioContainer.getExposedPorts().get(0);
final String apicurioUrl = "http://" + host + ":" + port + "/apis/registry/v2";
return apicurioUrl;
}

@AfterClass
public static void stopContainers() {
try {
Expand Down
2 changes: 1 addition & 1 deletion documentation/antora.yml
Expand Up @@ -16,7 +16,7 @@ asciidoc:
modules: '../../modules'
mysql-version: '8.0'
strimzi-version: '0.18.0'
apicurio-version: '1.3.2.Final'
apicurio-version: '2.0.0.Final'
db2-version: '11.5.0.0'
community: true
registry: 'Apicurio Registry'
Expand Down