Skip to content

Commit

Permalink
Debezium PostgresSQL Connector native support apache#1191
Browse files Browse the repository at this point in the history
  • Loading branch information
JiriOndrusek committed May 15, 2020
1 parent 4994632 commit 9691ae9
Show file tree
Hide file tree
Showing 24 changed files with 664 additions and 101 deletions.
1 change: 1 addition & 0 deletions .github/test-categories.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ core:
- core-main-xml-io
- core-main-xml-jaxb
database:
- debezium-postgres
- couchdb
- influxdb
- jdbc
Expand Down
4 changes: 2 additions & 2 deletions docs/modules/ROOT/pages/list-of-camel-quarkus-extensions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ Level | Since | Description
Preview | 1.0.0-M6 | Represents a Debezium MySQL endpoint which is used to capture changes in MySQL database so that that applications can see those changes and respond to them.

| link:https://camel.apache.org/components/latest/debezium-postgres-component.html[Debezium PostgresSQL Connector] (camel-quarkus-debezium-postgres) +
`debezium-postgres:name` | JVM +
Preview | 1.0.0-M6 | Represents a Debezium PostgresSQL endpoint which is used to capture changes in PostgresSQL database so that that applications can see those changes and respond to them.
`debezium-postgres:name` | Native +
Stable | 1.0.0-M8 | Represents a Debezium PostgresSQL endpoint which is used to capture changes in PostgresSQL database so that that applications can see those changes and respond to them.

| link:https://camel.apache.org/components/latest/debezium-sqlserver-component.html[Debezium SQL Server Connector] (camel-quarkus-debezium-sqlserver) +
`debezium-sqlserver:name` | JVM +
Expand Down

This file was deleted.

1 change: 0 additions & 1 deletion extensions-jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
<module>couchbase</module>
<module>debezium-mongodb</module>
<module>debezium-mysql</module>
<module>debezium-postgres</module>
<module>debezium-sqlserver</module>
<module>google-bigquery</module>
<module>google-pubsub</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.camel.quarkus</groupId>
Expand Down Expand Up @@ -48,16 +50,12 @@
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-support-debezium-deployment</artifactId>
<artifactId>camel-quarkus-debezium-postgres</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-postgresql-deployment</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-debezium-postgres</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.debezium.postgres.deployment;

import io.quarkus.arc.deployment.BeanContainerBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.IndexDependencyBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import org.apache.camel.component.debezium.DebeziumPostgresComponent;
import org.apache.camel.quarkus.component.debezium.postgres.graal.DebeziumPostgresRecorder;
import org.apache.camel.quarkus.core.deployment.CamelRuntimeBeanBuildItem;
import org.jboss.jandex.IndexView;

class DebeziumPostgresProcessor {

private static final String FEATURE = "camel-debezium-postgres";

@BuildStep
FeatureBuildItem feature() {
return new FeatureBuildItem(FEATURE);
}

@BuildStep
ReflectiveClassBuildItem registerForReflection(CombinedIndexBuildItem combinedIndex) {
IndexView index = combinedIndex.getIndex();

String[] dtos = index.getKnownClasses().stream()
.map(ci -> ci.name().toString())
//todo validate which are necessary
.filter(n -> n.startsWith("org.apache.kafka.connect.json") || n.startsWith("io.debezium.connector")
|| n.startsWith("io.debezium.embedded"))
.sorted()
.toArray(String[]::new);

return new ReflectiveClassBuildItem(false, true, dtos);
}

@BuildStep
void addDependencies(BuildProducer<IndexDependencyBuildItem> indexDependency) {
indexDependency.produce(new IndexDependencyBuildItem("org.apache.kafka", "connect-json"));
indexDependency.produce(new IndexDependencyBuildItem("io.debezium", "debezium-connector-postgres"));
indexDependency.produce(new IndexDependencyBuildItem("io.debezium", "debezium-embedded"));
}

@Record(ExecutionTime.STATIC_INIT)
@BuildStep
CamelRuntimeBeanBuildItem debeziumPostgresComponent(BeanContainerBuildItem beanContainer,
DebeziumPostgresRecorder recorder) {
return new CamelRuntimeBeanBuildItem(
"debezium-postgres",
DebeziumPostgresComponent.class.getName(),
recorder.createDebeziumPostgresComponent(beanContainer.getValue()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-build-parent-it</artifactId>
<artifactId>camel-quarkus-build-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../../poms/build-parent-it/pom.xml</relativePath>
<relativePath>../../poms/build-parent/pom.xml</relativePath>
</parent>

<artifactId>camel-quarkus-debezium-postgres-parent</artifactId>
Expand All @@ -33,6 +35,5 @@
<modules>
<module>deployment</module>
<module>runtime</module>
<module>integration-test</module>
</modules>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.camel.quarkus</groupId>
Expand All @@ -31,7 +33,7 @@
<description>Represents a Debezium PostgresSQL endpoint which is used to capture changes in PostgresSQL database so that that applications can see those changes and respond to them.</description>

<properties>
<firstVersion>1.0.0-M6</firstVersion>
<firstVersion>1.0.0-M8</firstVersion>
</properties>

<dependencyManagement>
Expand All @@ -47,22 +49,28 @@
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.graalvm.nativeimage</groupId>
<artifactId>svm</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-support-debezium</artifactId>
<groupId>org.apache.camel</groupId>
<artifactId>camel-debezium-postgres</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.maven</groupId>
<artifactId>maven-artifact</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-debezium-postgres</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.debezium.postgres.graal;

import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import org.apache.camel.component.debezium.DebeziumPostgresComponent;
import org.apache.camel.component.debezium.configuration.ConfigurationValidation;
import org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration;
import org.apache.camel.quarkus.component.debezium.postgres.graal.storage.NativeFileOffsetBackingStore;

/**
* Recorder initializes configuration with different default method for offsetStore
*/
@Recorder
public class DebeziumPostgresRecorder {

public RuntimeValue<DebeziumPostgresComponent> createDebeziumPostgresComponent(BeanContainer container) {
DebeziumPostgresComponent comp = container.instance(DebeziumPostgresComponent.class);
comp.setConfiguration(new QuarkusPostgresConnectorEmbeddedDebeziumConfiguration());
return new RuntimeValue<>(comp);
}

static class QuarkusPostgresConnectorEmbeddedDebeziumConfiguration extends PostgresConnectorEmbeddedDebeziumConfiguration {

public QuarkusPostgresConnectorEmbeddedDebeziumConfiguration() {
//change default value
setOffsetStorage(NativeFileOffsetBackingStore.class.getName());
}

@Override
protected ConfigurationValidation validateConnectorConfiguration() {
ConfigurationValidation validation = super.validateConnectorConfiguration();
// check for offsetStorageFileName
if (validation.isValid() && getOffsetStorage().equals(NativeFileOffsetBackingStore.class.getName())
&& isFieldValueNotSet(getOffsetStorageFileName())) {
return ConfigurationValidation.notValid(String
.format("Required field 'offsetStorageFileName' must be set since 'offsetStorage' is set to '%s'",
NativeFileOffsetBackingStore.class.getName()));
}
return validation;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.debezium.postgres.graal;

import com.oracle.svm.core.annotate.Alias;
import com.oracle.svm.core.annotate.TargetClass;
import org.apache.camel.quarkus.component.debezium.postgres.graal.storage.NativeFileOffsetBackingStore;

@TargetClass(org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration.class)
final class SubstituteEmbeddedDebeziumConfiguration {

@Alias
private String offsetStorage = NativeFileOffsetBackingStore.class.getName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.debezium.postgres.graal;

import com.oracle.svm.core.annotate.Substitute;
import com.oracle.svm.core.annotate.TargetClass;
import org.slf4j.Logger;

@TargetClass(io.debezium.metrics.Metrics.class)
final class SubstituteMetrics {

@Substitute
public synchronized void register(Logger logger) {
logger.debug("Metrics are not registered in native mode.");
}

@Substitute
public final void unregister(Logger logger) {
logger.debug("Metrics are not unregistered in native mode.");
}
}

0 comments on commit 9691ae9

Please sign in to comment.