Skip to content

Commit

Permalink
[camel-main-support] Use BaseMainSupport instead of Main #499
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Oct 7, 2020
1 parent acd4af9 commit 75ccafa
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 134 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.utils;

import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.MainListener;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CamelKafkaConnectMain extends BaseMainSupport {
public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectMain.class);

protected volatile ConsumerTemplate consumerTemplate;
protected volatile ProducerTemplate producerTemplate;

public CamelKafkaConnectMain(CamelContext context) {
this.camelContext = context;
}

@Override
protected void doInit() throws Exception {
super.doInit();
postProcessCamelContext(camelContext);
}

@Override
protected void doStart() throws Exception {
LOG.info("Starting Main");

for (MainListener listener : listeners) {
listener.beforeStart(this);
}

super.doStart();

getCamelContext().start();

for (MainListener listener : listeners) {
listener.afterStart(this);
}

LOG.info("Main started");
}

@Override
protected void doStop() throws Exception {
LOG.info("Stopping Main");

ServiceHelper.stopService(consumerTemplate);
consumerTemplate = null;

ServiceHelper.stopService(producerTemplate);
producerTemplate = null;

for (MainListener listener : listeners) {
listener.beforeStop(this);
}

super.doStart();

getCamelContext().stop();

for (MainListener listener : listeners) {
listener.afterStop(this);
}

LOG.info("Main stopped");
}

@Override
protected ProducerTemplate findOrCreateCamelTemplate() {
throw new UnsupportedOperationException("Should not happen");
}

@Override
protected CamelContext createCamelContext() {
throw new UnsupportedOperationException("Should not happen");
}

public ProducerTemplate getProducerTemplate() {
if (this.producerTemplate == null) {
synchronized (this) {
if (this.producerTemplate == null) {
this.producerTemplate = getCamelContext().createProducerTemplate();
}
}
}

return this.producerTemplate;
}

public ConsumerTemplate getConsumerTemplate() {
if (this.consumerTemplate == null) {
synchronized (this) {
if (this.consumerTemplate == null) {
this.consumerTemplate = getCamelContext().createConsumerTemplate();
}
}
}

return this.consumerTemplate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,71 +20,38 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.camel.AggregationStrategy;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.catalog.RuntimeCamelCatalog;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.Main;
import org.apache.camel.main.MainListener;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.support.PropertyBindingSupport;
import org.apache.camel.util.OrderedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CamelMainSupport {
public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
private static final Logger LOG = LoggerFactory.getLogger(CamelMainSupport.class);

private Main camelMain;
private CamelContext camel;
private final CamelKafkaConnectMain camelMain;

private final ExecutorService exService = Executors.newSingleThreadExecutor();
private final CountDownLatch startFinishedSignal = new CountDownLatch(1);

public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout) throws Exception {
this(props, fromUrl, toUrl, dataformats, aggregationSize, aggregationTimeout, new DefaultCamelContext());
}

public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout, CamelContext camelContext) throws Exception {
camel = camelContext;
camelMain = new Main() {
@Override
protected ProducerTemplate findOrCreateCamelTemplate() {
return camel.createProducerTemplate();
}

@Override
protected CamelContext createCamelContext() {
return camel;
}
};

camelMain.addMainListener(new CamelMainFinishedListener());
public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, List<CamelKafkaConnectDataformat> dataformats, int aggregationSize, long aggregationTimeout, CamelContext camelContext) {
camelMain = new CamelKafkaConnectMain(camelContext);
camelMain.configure().setAutoConfigurationLogSummary(false);


Properties camelProperties = new OrderedProperties();
Properties camelProperties = new Properties();
camelProperties.putAll(props);

LOG.info("Setting initial properties in Camel context: [{}]", camelProperties);
this.camel.getPropertiesComponent().setInitialProperties(camelProperties);
camelMain.setInitialProperties(camelProperties);

camelMain.init();
//creating the actual route
this.camel.addRoutes(new RouteBuilder() {
camelMain.configure().addRoutesBuilder(new RouteBuilder() {
public void configure() {
//from
RouteDefinition rd = from(fromUrl);
Expand All @@ -96,23 +63,22 @@ public void configure() {
switch (dataformat.getDataformatKind()) {
case MARSHALL:
LOG.info(".marshal().custom({})", dataformatId);
camel.getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
rd.marshal().custom(dataformatId);
break;
case UNMARSHALL:
LOG.info(".unmarshal().custom({})", dataformatId);
camel.getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
getContext().getRegistry().bind(dataformatId, lookupAndInstantiateDataformat(dataformatId));
rd.unmarshal().custom(dataformatId);
break;
default:
throw new UnsupportedOperationException("Unsupported dataformat: " + dataformat);
}
}


if (camel.getRegistry().lookupByName("aggregate") != null) {
if (getContext().getRegistry().lookupByName("aggregate") != null) {
//aggregation
AggregationStrategy s = (AggregationStrategy) camel.getRegistry().lookupByName("aggregate");
AggregationStrategy s = (AggregationStrategy) getContext().getRegistry().lookupByName("aggregate");
LOG.info(".aggregate({}).constant(true).completionSize({}).completionTimeout({})", s, aggregationSize, aggregationTimeout);
LOG.info(".to({})", toUrl);
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(toUrl);
Expand All @@ -125,16 +91,14 @@ public void configure() {
});
}

public void start() throws Exception {
public void start() {
LOG.info("Starting CamelContext");

CamelContextStarter starter = new CamelContextStarter();
exService.execute(starter);
startFinishedSignal.await();

if (starter.hasException()) {
LOG.info("CamelContext failed to start", starter.getException());
throw starter.getException();
try {
camelMain.start();
} catch (Exception e) {
LOG.info("CamelContext failed to start", e);
throw e;
}

LOG.info("CamelContext started");
Expand All @@ -143,46 +107,46 @@ public void start() throws Exception {
public void stop() {
LOG.info("Stopping CamelContext");

camelMain.stop();
exService.shutdown();
try {
camelMain.stop();
} catch (Exception e) {
LOG.info("CamelContext failed to stop", e);
throw e;
}

LOG.info("CamelContext stopped");
}

public ProducerTemplate createProducerTemplate() {
return camel.createProducerTemplate();
return camelMain.getProducerTemplate();
}

public Endpoint getEndpoint(String uri) {
return camel.getEndpoint(uri);
return camelMain.getCamelContext().getEndpoint(uri);
}

public Collection<Endpoint> getEndpoints() {
return camel.getEndpoints();
return camelMain.getCamelContext().getEndpoints();
}

public ConsumerTemplate createConsumerTemplate() {
return camel.createConsumerTemplate();
}

public RuntimeCamelCatalog getRuntimeCamelCatalog() {
return camel.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
return camelMain.getConsumerTemplate();
}

private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
DataFormat df = camel.resolveDataFormat(dataformatName);
DataFormat df = camelMain.getCamelContext().resolveDataFormat(dataformatName);

if (df == null) {
df = camel.createDataFormat(dataformatName);
df = camelMain.getCamelContext().createDataFormat(dataformatName);

final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + ".";
final Properties props = camel.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
final Properties props = camelMain.getCamelContext().getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));

CamelContextAware.trySetCamelContext(df, camel);
CamelContextAware.trySetCamelContext(df, camelMain.getCamelContext());

if (!props.isEmpty()) {
PropertyBindingSupport.build()
.withCamelContext(camel)
.withCamelContext(camelMain.getCamelContext())
.withOptionPrefix(prefix)
.withRemoveParameters(false)
.withProperties((Map) props)
Expand All @@ -198,71 +162,4 @@ private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
return df;
}

private class CamelMainFinishedListener implements MainListener {
@Override
public void configure(CamelContext context) {

}

@Override
public void beforeStart(BaseMainSupport main) {

}

@Override
public void afterStart(BaseMainSupport main) {
LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called");
startFinishedSignal.countDown();
}

@Override
public void beforeStop(BaseMainSupport main) {

}

@Override
public void afterStop(BaseMainSupport main) {

}

@Override
public void beforeConfigure(BaseMainSupport main) {
}

@Override
public void afterConfigure(BaseMainSupport main) {

}

@Override
public void beforeInitialize(BaseMainSupport main) {

}
}

private class CamelContextStarter implements Runnable {
private Exception startException;

@Override
public void run() {
try {
camelMain.run();
} catch (Exception e) {
LOG.error("An exception has occurred before CamelContext startup has finished", e);
startException = e;
if (startFinishedSignal.getCount() > 0) {
LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception");
startFinishedSignal.countDown();
}
}
}

public boolean hasException() {
return startException != null;
}

public Exception getException() {
return startException;
}
}
}

0 comments on commit 75ccafa

Please sign in to comment.