diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java new file mode 100644 index 0000000000..46b082251a --- /dev/null +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelKafkaConnectMain.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.utils; + +import org.apache.camel.CamelContext; +import org.apache.camel.ConsumerTemplate; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.main.BaseMainSupport; +import org.apache.camel.main.MainListener; +import org.apache.camel.support.service.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CamelKafkaConnectMain extends BaseMainSupport { + public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat."; + private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class); + + protected volatile ConsumerTemplate consumerTemplate; + protected volatile ProducerTemplate producerTemplate; + + public CamelKafkaConnectMain(CamelContext context) { + this.camelContext = context; + } + + @Override + protected void doInit() throws Exception { + super.doInit(); + postProcessCamelContext(camelContext); + } + + @Override + protected void doStart() throws Exception { + LOG.info("Starting Main"); + + for (MainListener listener : listeners) { + listener.beforeStart(this); + } + + super.doStart(); + + getCamelContext().start(); + + for (MainListener listener : listeners) { + listener.afterStart(this); + } + + LOG.info("Main started"); + } + + @Override + protected void doStop() throws Exception { + LOG.info("Stopping Main"); + + ServiceHelper.stopService(consumerTemplate); + consumerTemplate = null; + + ServiceHelper.stopService(producerTemplate); + producerTemplate = null; + + for (MainListener listener : listeners) { + listener.beforeStop(this); + } + + super.doStart(); + + getCamelContext().stop(); + + for (MainListener listener : listeners) { + listener.afterStop(this); + } + + LOG.info("Main stopped"); + } + + @Override + protected ProducerTemplate findOrCreateCamelTemplate() { + throw new UnsupportedOperationException("Should not happen"); + } + + @Override + protected CamelContext createCamelContext() { + throw new UnsupportedOperationException("Should not happen"); + } + + public ProducerTemplate getProducerTemplate() { + if (this.producerTemplate == null) { + synchronized (this) { + if (this.producerTemplate == null) { + this.producerTemplate = getCamelContext().createProducerTemplate(); + } + } + } + + return this.producerTemplate; + } + + public ConsumerTemplate getConsumerTemplate() { + if (this.consumerTemplate == null) { + synchronized (this) { + if (this.consumerTemplate == null) { + this.consumerTemplate = getCamelContext().createConsumerTemplate(); + } + } + } + + return this.consumerTemplate; + } +} diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java index 5a5f1fe73f..6c17ecc091 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java @@ -20,27 +20,17 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.camel.AggregationStrategy; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.ConsumerTemplate; import org.apache.camel.Endpoint; -import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.catalog.RuntimeCamelCatalog; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.main.BaseMainSupport; -import org.apache.camel.main.Main; -import org.apache.camel.main.MainListener; import org.apache.camel.model.RouteDefinition; import org.apache.camel.spi.DataFormat; import org.apache.camel.support.PropertyBindingSupport; -import org.apache.camel.util.OrderedProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,43 +38,20 @@ public class CamelMainSupport { public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat."; private static final Logger LOG = LoggerFactory.getLogger(CamelMainSupport.class); - private Main camelMain; - private CamelContext camel; + private final CamelKafkaConnectMain camelMain; - private final ExecutorService exService = Executors.newSingleThreadExecutor(); - private final CountDownLatch startFinishedSignal = new CountDownLatch(1); - - public CamelMainSupport(Map props, String fromUrl, String toUrl, List dataformats, int aggregationSize, long aggregationTimeout) throws Exception { - this(props, fromUrl, toUrl, dataformats, aggregationSize, aggregationTimeout, new DefaultCamelContext()); - } - - public CamelMainSupport(Map props, String fromUrl, String toUrl, List dataformats, int aggregationSize, long aggregationTimeout, CamelContext camelContext) throws Exception { - camel = camelContext; - camelMain = new Main() { - @Override - protected ProducerTemplate findOrCreateCamelTemplate() { - return camel.createProducerTemplate(); - } - - @Override - protected CamelContext createCamelContext() { - return camel; - } - }; - - camelMain.addMainListener(new CamelMainFinishedListener()); + public CamelMainSupport(Map props, String fromUrl, String toUrl, List dataformats, int aggregationSize, long aggregationTimeout, CamelContext camelContext) { + camelMain = new CamelKafkaConnectMain(camelContext); camelMain.configure().setAutoConfigurationLogSummary(false); - - Properties camelProperties = new OrderedProperties(); + Properties camelProperties = new Properties(); camelProperties.putAll(props); LOG.info("Setting initial properties in Camel context: [{}]", camelProperties); - this.camel.getPropertiesComponent().setInitialProperties(camelProperties); + camelMain.setInitialProperties(camelProperties); - camelMain.init(); //creating the actual route - this.camel.addRoutes(new RouteBuilder() { + camelMain.configure().addRoutesBuilder(new RouteBuilder() { public void configure() { //from RouteDefinition rd = from(fromUrl); @@ -96,12 +63,12 @@ public void configure() { switch (dataformat.getDataformatKind()) { case MARSHALL: LOG.info(".marshal().custom({})", dataformatId); - camel.getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId)); + getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId)); rd.marshal().custom(dataformatId); break; case UNMARSHALL: LOG.info(".unmarshal().custom({})", dataformatId); - camel.getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId)); + getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId)); rd.unmarshal().custom(dataformatId); break; default: @@ -109,10 +76,9 @@ public void configure() { } } - - if (camel.getRegistry().lookupByName("aggregate") != null) { + if (getContext().getRegistry().lookupByName("aggregate") != null) { //aggregation - AggregationStrategy s = (AggregationStrategy) camel.getRegistry().lookupByName("aggregate"); + AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate"); LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout); LOG.info(".to({})", toUrl); rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(toUrl); @@ -125,16 +91,14 @@ public void configure() { }); } - public void start() throws Exception { + public void start() { LOG.info("Starting CamelContext"); - CamelContextStarter starter = new CamelContextStarter(); - exService.execute(starter); - startFinishedSignal.await(); - - if (starter.hasException()) { - LOG.info("CamelContext failed to start", starter.getException()); - throw starter.getException(); + try { + camelMain.start(); + } catch (Exception e) { + LOG.info("CamelContext failed to start", e); + throw e; } LOG.info("CamelContext started"); @@ -143,46 +107,46 @@ public void start() throws Exception { public void stop() { LOG.info("Stopping CamelContext"); - camelMain.stop(); - exService.shutdown(); + try { + camelMain.stop(); + } catch (Exception e) { + LOG.info("CamelContext failed to stop", e); + throw e; + } LOG.info("CamelContext stopped"); } public ProducerTemplate createProducerTemplate() { - return camel.createProducerTemplate(); + return camelMain.getProducerTemplate(); } public Endpoint getEndpoint(String uri) { - return camel.getEndpoint(uri); + return camelMain.getCamelContext().getEndpoint(uri); } public Collection getEndpoints() { - return camel.getEndpoints(); + return camelMain.getCamelContext().getEndpoints(); } public ConsumerTemplate createConsumerTemplate() { - return camel.createConsumerTemplate(); - } - - public RuntimeCamelCatalog getRuntimeCamelCatalog() { - return camel.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(); + return camelMain.getConsumerTemplate(); } private DataFormat lookupAndInstantiateDataformat(String dataformatName) { - DataFormat df = camel.resolveDataFormat(dataformatName); + DataFormat df = camelMain.getCamelContext().resolveDataFormat(dataformatName); if (df == null) { - df = camel.createDataFormat(dataformatName); + df = camelMain.getCamelContext().createDataFormat(dataformatName); final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + "."; - final Properties props = camel.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix)); + final Properties props = camelMain.getCamelContext().getPropertiesComponent().loadProperties(k -> k.startsWith(prefix)); - CamelContextAware.trySetCamelContext(df, camel); + CamelContextAware.trySetCamelContext(df, camelMain.getCamelContext()); if (!props.isEmpty()) { PropertyBindingSupport.build() - .withCamelContext(camel) + .withCamelContext(camelMain.getCamelContext()) .withOptionPrefix(prefix) .withRemoveParameters(false) .withProperties((Map) props) @@ -198,71 +162,4 @@ private DataFormat lookupAndInstantiateDataformat(String dataformatName) { return df; } - private class CamelMainFinishedListener implements MainListener { - @Override - public void configure(CamelContext context) { - - } - - @Override - public void beforeStart(BaseMainSupport main) { - - } - - @Override - public void afterStart(BaseMainSupport main) { - LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called"); - startFinishedSignal.countDown(); - } - - @Override - public void beforeStop(BaseMainSupport main) { - - } - - @Override - public void afterStop(BaseMainSupport main) { - - } - - @Override - public void beforeConfigure(BaseMainSupport main) { - } - - @Override - public void afterConfigure(BaseMainSupport main) { - - } - - @Override - public void beforeInitialize(BaseMainSupport main) { - - } - } - - private class CamelContextStarter implements Runnable { - private Exception startException; - - @Override - public void run() { - try { - camelMain.run(); - } catch (Exception e) { - LOG.error("An exception has occurred before CamelContext startup has finished", e); - startException = e; - if (startFinishedSignal.getCount() > 0) { - LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception"); - startFinishedSignal.countDown(); - } - } - } - - public boolean hasException() { - return startException != null; - } - - public Exception getException() { - return startException; - } - } }