Skip to content
Permalink
Browse files
Related to #423 added autogeneration of kamelets
  • Loading branch information
valdar committed Oct 25, 2021
1 parent ca4e784 commit c08884936e30698b19afe37bd216f1c6e15157b2
Showing 28 changed files with 2,087 additions and 78 deletions.
@@ -38,6 +38,7 @@
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -54,6 +55,7 @@ public class CamelSinkTask extends SinkTask {
private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTask.class);

private static final String LOCAL_URL = "direct:start";
private static final String DEFAULT_KAMELET_CKC_SINK = "kamelet:ckcSink";
private ErrantRecordReporter reporter;

private CamelKafkaConnectMain cms;
@@ -123,7 +125,7 @@ public void start(Map<String, String> props) {
}
actualProps.put(KAMELET_SINK_TEMPLATE_PARAMETERS_PREFIX + "toUrl", remoteUrl);

cms = CamelKafkaConnectMain.builder(LOCAL_URL, "kamelet:ckcSink")
cms = CamelKafkaConnectMain.builder(LOCAL_URL, getSinkKamelet())
.withProperties(actualProps)
.withUnmarshallDataFormat(unmarshaller)
.withMarshallDataFormat(marshaller)
@@ -144,7 +146,6 @@ public void start(Map<String, String> props) {
.withHeadersExcludePattern(headersRemovePattern)
.build(camelContext);


cms.start();

producer = cms.getProducerTemplate();
@@ -156,6 +157,11 @@ public void start(Map<String, String> props) {
}
}

@NotNull
protected String getSinkKamelet() {
return DEFAULT_KAMELET_CKC_SINK;
}

protected CamelSinkConnectorConfig getCamelSinkConnectorConfig(Map<String, String> props) {
return new CamelSinkConnectorConfig(props);
}
@@ -43,6 +43,7 @@
import org.apache.kafka.connect.source.SourceTask;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -59,6 +60,7 @@ public class CamelSourceTask extends SourceTask {
private static final String CAMEL_SOURCE_PATH_PROPERTIES_PREFIX = "camel.source.path.";

private static final String LOCAL_URL = "seda:end";
private static final String DEFAULT_KAMELET_CKC_SOURCE = "kamelet:ckcSource";

private CamelKafkaConnectMain cms;
private PollingConsumer consumer;
@@ -148,7 +150,7 @@ public Integer get() {
}
actualProps.put(KAMELET_SOURCE_TEMPLATE_PARAMETERS_PREFIX + "fromUrl", remoteUrl);

cms = CamelKafkaConnectMain.builder("kamelet:ckcSource", localUrl)
cms = CamelKafkaConnectMain.builder(getSourceKamelet(), localUrl)
.withProperties(actualProps)
.withUnmarshallDataFormat(unmarshaller)
.withMarshallDataFormat(marshaller)
@@ -180,6 +182,11 @@ public Integer get() {
}
}

@NotNull
protected String getSourceKamelet() {
return DEFAULT_KAMELET_CKC_SOURCE;
}

private long remaining(long startPollEpochMilli, long maxPollDuration) {
return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
}
@@ -29,6 +29,7 @@

<kafka.version>2.8.0</kafka.version>
<camel.version>3.11.1</camel.version>
<camel.kamelet.catalog.version>0.3.0</camel.kamelet.catalog.version>
<apicurio.registry.version>1.3.2.Final</apicurio.registry.version>
<resteasy.version>4.5.6.Final</resteasy.version>
<version.java>1.8</version.java>
@@ -116,6 +117,12 @@
<version>${version.guava}</version>
</dependency>

<dependency>
<groupId>org.apache.camel.kamelets</groupId>
<artifactId>camel-kamelets-catalog</artifactId>
<version>${camel.kamelet.catalog.version}</version>
</dependency>

<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
@@ -51,6 +51,11 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
<version>${classgraph.version}</version>
</dependency>

<!-- Maven plugin deps -->
<dependency>
@@ -106,6 +111,18 @@
<version>2.3.1</version>
</dependency>

<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<!-- <version>${jackson.version}</version>-->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<!-- <version>${jackson.version}</version>-->
</dependency>

<!-- camel -->
<dependency>
<groupId>org.apache.camel</groupId>
@@ -120,6 +137,11 @@
<artifactId>camel-catalog</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.camel.kamelets</groupId>
<artifactId>camel-kamelets-catalog</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-package-maven-plugin</artifactId>
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.maven;

import org.apache.maven.plugins.annotations.Parameter;

public abstract class AbstractCamelComponentKafkaConnectorMojo extends AbstractCamelKafkaConnectorMojo {

/**
* The initial pom template file.
*/
@Parameter(defaultValue = "camel-kafka-connector-template-pom.template")
protected String initialPomTemplate;

/**
* Properties file to configure additional dependencies.
*/
@Parameter(defaultValue = "camel-kafka-connector-fix-dependencies.properties")
protected String fixDependenciesProperties;
}
@@ -53,12 +53,6 @@ public abstract class AbstractCamelKafkaConnectorMojo extends AbstractMojo {
@Parameter(property = "project", required = true, readonly = true)
protected MavenProject project;

/**
* The initial pom template file.
*/
@Parameter(defaultValue = "camel-kafka-connector-template-pom.template")
protected String initialPomTemplate;

/**
* NOTICE file.
*/
@@ -71,12 +65,6 @@ public abstract class AbstractCamelKafkaConnectorMojo extends AbstractMojo {
@Parameter(defaultValue = "camel-kafka-connector-LICENSE.txt")
protected String licenseTemplate;

/**
* Properties file to configure additional dependencies.
*/
@Parameter(defaultValue = "camel-kafka-connector-fix-dependencies.properties")
protected String fixDependenciesProperties;

/**
* Package file template to be placed in src/main/assembly/package.xml.
*/
@@ -130,13 +118,13 @@ public abstract class AbstractCamelKafkaConnectorMojo extends AbstractMojo {
/**
* Execute goal.
*
* @throws MojoExecutionException execution of the main class or one of the
* threads it generated failed.
* @throws MojoExecutionException execution of the main class or one of the threads it generated failed.
* @throws MojoFailureException something bad happened...
*/
@Override
public void execute() throws MojoExecutionException, MojoFailureException {
configureResourceManager();
//execute only once for the connectors parent project which can be configured with <connectorsProjectName> option
if (!project.getArtifactId().equals(connectorsProjectName)) {
getLog().debug("Skipping project " + project.getArtifactId() + " since it is not " + connectorsProjectName + ", which can be configured with <connectorsProjectName> option.");
return;
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.maven;

import org.apache.maven.plugins.annotations.Parameter;

public abstract class AbstractCamelKameletKafkaConnectorMojo extends AbstractCamelKafkaConnectorMojo {

/**
* The initial kamelet pom template file.
*/
@Parameter(defaultValue = "camel-kafka-connector-kamelet-template-pom.template")
protected String initialKameletPomTemplate;

/**
* Properties kamelet file to configure additional dependencies.
*/
@Parameter(defaultValue = "camel-kafka-connector-kamelet-fix-dependencies.properties")
protected String fixKameletDependenciesProperties;
}
@@ -42,7 +42,7 @@

@Mojo(name = "camel-kafka-connector-create", threadSafe = true,
defaultPhase = LifecyclePhase.GENERATE_RESOURCES)
public class CamelKafkaConnectorCreateMojo extends AbstractCamelKafkaConnectorMojo {
public class CamelKafkaConnectorCreateMojo extends AbstractCamelComponentKafkaConnectorMojo {

@Parameter(property = "name", required = true)
protected String name;
@@ -115,7 +115,7 @@ private void generateAndWritePom(String sanitizedName, File directory) throws Ex
props.put("componentDescription", name);
try {
Document pom = MavenUtils.createCrateXmlDocumentFromTemplate(pomTemplate, props);
// Write the starter pom
// Write the connector pom
File pomFile = new File(directory, "pom.xml");
writeXmlFormatted(pom, pomFile, getLog());
} catch (Exception e) {
@@ -34,7 +34,7 @@

@Mojo(name = "camel-kafka-connector-delete", threadSafe = true,
defaultPhase = LifecyclePhase.GENERATE_RESOURCES)
public class CamelKafkaConnectorDeleteMojo extends AbstractCamelKafkaConnectorMojo {
public class CamelKafkaConnectorDeleteMojo extends AbstractCamelComponentKafkaConnectorMojo {

@Parameter(property = "name", required = true)
protected String name;
@@ -58,7 +58,7 @@ public void executeAll() throws MojoFailureException {
try {
deleteConnector();
} catch (Exception e) {
throw new MojoFailureException("Fail to create connector " + name, e);
throw new MojoFailureException("Fail to delete connector " + name, e);
}
}

0 comments on commit c088849

Please sign in to comment.