diff --git a/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java b/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java new file mode 100644 index 0000000000000..64cee5886a2ae --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.ha; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.StampedLock; +import java.util.function.BiConsumer; +import java.util.function.Predicate; + +public abstract class AbstractCamelClusterView implements CamelClusterView { + private final CamelCluster cluster; + private final String namespace; + private final List consumers; + private final StampedLock lock; + + protected AbstractCamelClusterView(CamelCluster cluster, String namespace) { + this.cluster = cluster; + this.namespace = namespace; + this.consumers = new ArrayList<>(); + this.lock = new StampedLock(); + } + + @Override + public CamelCluster getCluster() { + return this.cluster; + } + + @Override + public String getNamespace() { + return this.namespace; + } + + @Override + public void addEventListener(BiConsumer consumer) { + long stamp = lock.writeLock(); + + try { + consumers.add(new FilteringConsumer(e -> true, consumer)); + } finally { + lock.unlockWrite(stamp); + } + } + + @Override + public void addEventListener(Predicate predicate, BiConsumer consumer) { + long stamp = lock.writeLock(); + + try { + this.consumers.add(new FilteringConsumer(predicate, consumer)); + } finally { + lock.unlockWrite(stamp); + } + } + + @Override + public void removeEventListener(BiConsumer consumer) { + long stamp = lock.writeLock(); + + try { + consumers.removeIf(c -> c.getConsumer().equals(consumer)); + } finally { + lock.unlockWrite(stamp); + } + } + + // ************************************** + // Events + // ************************************** + + protected void fireEvent(CamelClusterView.Event event, Object payload) { + long stamp = lock.readLock(); + + try { + for (int i = 0; i < consumers.size(); i++) { + consumers.get(0).accept(event, payload); + } + } finally { + lock.unlockRead(stamp); + } + } + + // ************************************** + // Helpers + // ************************************** + + private final class FilteringConsumer implements BiConsumer { + private final Predicate predicate; + private final BiConsumer consumer; + + FilteringConsumer(Predicate predicate, BiConsumer consumer) { + this.predicate = predicate; + this.consumer = consumer; + } + + @Override + public void accept(CamelClusterView.Event event, Object payload) { + if (predicate.test(event)) { + consumer.accept(event, payload); + } + } + + public BiConsumer getConsumer() { + return this.consumer; + } + } +} diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java b/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java new file mode 100644 index 0000000000000..5da1e8c234b2c --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.ha; + +import org.apache.camel.spi.HasId; + +public interface CamelCluster extends HasId { + /** + * Creates a view of the cluster bound to a namespace. + * + * @param namespace the namespace the view refer to. + * @return the cluster view. + * @throws Exception if the view can't be created. + */ + CamelClusterView createView(String namespace) throws Exception; +} diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterMember.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterMember.java new file mode 100644 index 0000000000000..cfe0a793d7bc1 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterMember.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.ha; + +import org.apache.camel.spi.HasId; + +public interface CamelClusterMember extends HasId { + /** + * @return true if this member is the master. + */ + boolean isMaster(); +} diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java new file mode 100644 index 0000000000000..7af5a210602dc --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.ha; + +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Predicate; + +/** + * Represents the View of the cluster at some given period of time. + */ +public interface CamelClusterView { + + enum Event { + KEEP_ALIVE, + LEADERSHIP_CHANGED; + } + + /** + * @return the cluster. + */ + CamelCluster getCluster(); + + /** + * @return the namespace for this view. + */ + String getNamespace(); + + /** + * Provides the master member. + * + * @return the master member. + */ + CamelClusterMember getMaster(); + + /** + * Provides the local member. + * + * @return the local member. + */ + CamelClusterMember getLocalMember(); + + /** + * Provides the list of members of the cluster. + * + * @return the list of members. + */ + List getMembers(); + + /** + * Add an event consumer. + * + * @param consumer the event consumer. + */ + void addEventListener(BiConsumer consumer); + + /** + * Add an event consumer for events matching the given predicate. + * + * @param predicate the predicate to filter events. + * @param consumer the event consumer. + */ + void addEventListener(Predicate predicate, BiConsumer consumer); + + /** + * Remove the event consumer. + * + * @param event the event consumer. + */ + void removeEventListener(BiConsumer event); +} diff --git a/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java new file mode 100644 index 0000000000000..40ae1e84e59a5 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.ha; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Route; +import org.apache.camel.api.management.ManagedAttribute; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.support.RoutePolicySupport; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ReferenceCount; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ManagedResource(description = "Route policy using ...") +public class LeaderRoutePolicy extends RoutePolicySupport implements CamelContextAware { + private static final Logger LOGGER = LoggerFactory.getLogger(LeaderRoutePolicy.class); + + private final AtomicBoolean leader; + private final Set startedRoutes; + private final Set stoppedRoutes; + private final ReferenceCount refCount; + private final CamelClusterView clusterView; + private final CamelClusterMember clusterMember; + private final BiConsumer clusterEventConsumer; + private CamelContext camelContext; + + public LeaderRoutePolicy(CamelClusterView clusterView, CamelClusterMember clusterMember) { + this.clusterMember = clusterMember; + this.clusterView = clusterView; + this.clusterEventConsumer = this::onClusterEvent; + this.stoppedRoutes = new HashSet<>(); + this.startedRoutes = new HashSet<>(); + this.leader = new AtomicBoolean(false); + + this.refCount = ReferenceCount.on( + () -> clusterView.addEventListener(clusterEventConsumer), + () -> clusterView.removeEventListener(clusterEventConsumer) + ); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public synchronized void onInit(Route route) { + super.onInit(route); + + LOGGER.info("Route managed by {}. Setting route {} AutoStartup flag to false.", getClass(), route.getId()); + route.getRouteContext().getRoute().setAutoStartup("false"); + + stoppedRoutes.add(route); + + this.refCount.retain(); + + startManagedRoutes(); + } + + @Override + public synchronized void doShutdown() { + this.refCount.release(); + } + + // **************************************************** + // Management + // **************************************************** + + @ManagedAttribute(description = "Is this route the master or a slave") + public boolean isLeader() { + return leader.get(); + } + + // **************************************************** + // Route managements + // **************************************************** + + private void onClusterEvent(CamelClusterView.Event event, Object payload) { + if (event == CamelClusterView.Event.KEEP_ALIVE) { + LOGGER.debug("Got KEEP_ALIVE from cluster '{}' with payload '{}'", clusterView.getCluster().getId(), Objects.toString(payload)); + } + if (event == CamelClusterView.Event.LEADERSHIP_CHANGED) { + boolean isLeader = ObjectHelper.equal(clusterMember.getId(), clusterView.getMaster().getId()); + + if (isLeader && leader.compareAndSet(false, isLeader)) { + LOGGER.info("Leadership taken"); + startManagedRoutes(); + } else if (!isLeader && leader.getAndSet(isLeader)) { + LOGGER.info("Leadership lost"); + stopManagedRoutes(); + } + } + } + + private synchronized void startManagedRoutes() { + if (isLeader()) { + doStartManagedRoutes(); + } else { + // If the leadership has been lost in the meanwhile, stop any + // eventually started route + doStopManagedRoutes(); + } + } + + private synchronized void doStartManagedRoutes() { + try { + for (Route route : stoppedRoutes) { + LOGGER.debug("Starting route {}", route.getId()); + startRoute(route); + startedRoutes.add(route); + } + + stoppedRoutes.removeAll(startedRoutes); + } catch (Exception e) { + handleException(e); + } + } + + private synchronized void stopManagedRoutes() { + if (isLeader()) { + // If became a leader in the meanwhile, start any eventually stopped + // route + doStartManagedRoutes(); + } else { + doStopManagedRoutes(); + } + } + + private synchronized void doStopManagedRoutes() { + try { + for (Route route : startedRoutes) { + LOGGER.debug("Stopping route {}", route.getId()); + stopRoute(route); + stoppedRoutes.add(route); + } + + startedRoutes.removeAll(stoppedRoutes); + } catch (Exception e) { + handleException(e); + } + } +} diff --git a/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java index 5196da86620d8..28df5ac863783 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/RouteDefinition.java @@ -1303,4 +1303,17 @@ protected RouteContext addRoutes(CamelContext camelContext, Collection ro return routeContext; } + + // **************************** + // Static helpers + // **************************** + + public static RouteDefinition fromUri(String uri) { + return new RouteDefinition().from(uri); + } + + public static RouteDefinition fromEndpoint(Endpoint endpoint) { + return new RouteDefinition().from(endpoint); + } + } diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java index 7afc26a0616c7..3f91f14b061f4 100644 --- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java +++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java @@ -36,6 +36,7 @@ import org.apache.camel.support.RoutePolicySupport; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ReferenceCount; import org.apache.camel.util.StringHelper; import org.infinispan.Cache; import org.infinispan.client.hotrod.RemoteCache; @@ -58,13 +59,14 @@ public class InfinispanRoutePolicy extends RoutePolicySupport implements CamelCo private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRoutePolicy.class); private final AtomicBoolean leader; - private final Set suspendedRoutes; + private final Set startedRoutes; + private final Set stoppeddRoutes; private final InfinispanManager manager; + private final ReferenceCount refCount; - private Route route; private CamelContext camelContext; private ScheduledExecutorService executorService; - private boolean shouldStopConsumer; + private boolean shouldStopRoute; private String lockMapName; private String lockKey; private String lockValue; @@ -83,14 +85,16 @@ public InfinispanRoutePolicy(InfinispanManager manager) { public InfinispanRoutePolicy(InfinispanManager manager, String lockKey, String lockValue) { this.manager = manager; - this.suspendedRoutes = new HashSet<>(); + this.stoppeddRoutes = new HashSet<>(); + this.startedRoutes = new HashSet<>(); this.leader = new AtomicBoolean(false); - this.shouldStopConsumer = true; + this.shouldStopRoute = true; this.lockKey = lockKey; this.lockValue = lockValue; this.lifespan = 30; this.lifespanTimeUnit = TimeUnit.SECONDS; this.service = null; + this.refCount = ReferenceCount.on(this::startService, this::stopService); } @Override @@ -104,48 +108,29 @@ public void setCamelContext(CamelContext camelContext) { } @Override - public void onInit(Route route) { + public synchronized void onInit(Route route) { super.onInit(route); - this.route = route; - } - @Override - public void onStart(Route route) { - try { - startService(); - } catch (Exception e) { - throw new RuntimeCamelException(e); - } + LOGGER.info("Route managed by {}. Setting route {} AutoStartup flag to false.", getClass(), route.getId()); + route.getRouteContext().getRoute().setAutoStartup("false"); - if (!leader.get() && shouldStopConsumer) { - stopConsumer(route); - } - } + stoppeddRoutes.add(route); - @Override - public synchronized void onStop(Route route) { - try { - stopService(); - } catch (Exception e) { - throw new RuntimeCamelException(e); - } + this.refCount.retain(); - suspendedRoutes.remove(route); + startManagedRoutes(); } @Override - public synchronized void onSuspend(Route route) { - try { - stopService(); - } catch (Exception e) { - throw new RuntimeCamelException(e); - } - - suspendedRoutes.remove(route); + public synchronized void doShutdown() { + this.refCount.release(); } - @Override - protected void doStart() throws Exception { + // **************************************** + // Helpers + // **************************************** + + private void startService() { // validate StringHelper.notEmpty(lockMapName, "lockMapName", this); StringHelper.notEmpty(lockKey, "lockKey", this); @@ -156,107 +141,89 @@ protected void doStart() throws Exception { this.lockValue = camelContext.getUuidGenerator().generateUuid(); } - this.manager.start(); - this.executorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "InfinispanRoutePolicy"); - - if (lifespanTimeUnit.convert(lifespan, TimeUnit.SECONDS) < 2) { - throw new IllegalArgumentException("Lock lifespan can not be less that 2 seconds"); - } - - BasicCache cache = manager.getCache(lockMapName); - if (manager.isCacheContainerEmbedded()) { - this.service = new EmbeddedCacheService(InfinispanUtil.asEmbedded(cache)); - } else { - this.service = new RemoteCacheService(InfinispanUtil.asRemote(cache)); - } + try { + this.manager.start(); + this.executorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "InfinispanRoutePolicy"); - super.doStart(); - } + if (lifespanTimeUnit.convert(lifespan, TimeUnit.SECONDS) < 2) { + throw new IllegalArgumentException("Lock lifespan can not be less that 2 seconds"); + } - @Override - protected void doStop() throws Exception { - if (future != null) { - future.cancel(true); - future = null; - } + BasicCache cache = manager.getCache(lockMapName); + if (manager.isCacheContainerEmbedded()) { + this.service = new EmbeddedCacheService(InfinispanUtil.asEmbedded(cache)); + } else { + this.service = new RemoteCacheService(InfinispanUtil.asRemote(cache)); + } - if (this.service != null) { - this.service.stop(); + service.start(); + } catch (Exception e) { + throw new RuntimeCamelException(e); } + } - getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); - + private void stopService() { leader.set(false); - manager.stop(); - super.doStop(); - } + try { + if (future != null) { + future.cancel(true); + future = null; + } - private void startService() throws Exception { - if (service == null) { - throw new IllegalStateException("An Infinispan CacheService should be configured"); - } + manager.stop(); - service.start(); - } - - private void stopService() throws Exception { - leader.set(false); + if (this.service != null) { + this.service.stop(); + } - if (this.service != null) { - this.service.stop(); + getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService); + } catch (Exception e) { + throw new RuntimeCamelException(e); } } - // ************************************************************************* - // - // ************************************************************************* - - protected void setLeader(boolean isLeader) { + private void setLeader(boolean isLeader) { if (isLeader && leader.compareAndSet(false, isLeader)) { LOGGER.info("Leadership taken (map={}, key={}, val={})", lockMapName, lockKey, lockValue); - - startAllStoppedConsumers(); + startManagedRoutes(); } else if (!isLeader && leader.getAndSet(isLeader)) { LOGGER.info("Leadership lost (map={}, key={} val={})", lockMapName, lockKey, lockValue); + stopManagedRoutes(); } + } - if (!isLeader && this.route != null) { - stopConsumer(route); + private synchronized void startManagedRoutes() { + if (!isLeader()) { + return; } - } - private synchronized void startConsumer(Route route) { try { - if (suspendedRoutes.contains(route)) { - startConsumer(route.getConsumer()); - suspendedRoutes.remove(route); + for (Route route : stoppeddRoutes) { + LOGGER.debug("Starting route {}", route.getId()); + startRoute(route); + startedRoutes.add(route); } + + stoppeddRoutes.removeAll(startedRoutes); } catch (Exception e) { handleException(e); } } - private synchronized void stopConsumer(Route route) { - try { - if (!suspendedRoutes.contains(route)) { - LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer()); - stopConsumer(route.getConsumer()); - suspendedRoutes.add(route); - } - } catch (Exception e) { - handleException(e); + private synchronized void stopManagedRoutes() { + if (isLeader()) { + return; } - } - private synchronized void startAllStoppedConsumers() { try { - for (Route route : suspendedRoutes) { - LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer()); - startConsumer(route.getConsumer()); + for (Route route : startedRoutes) { + LOGGER.debug("Stopping route {}", route.getId()); + stopRoute(route); + stoppeddRoutes.add(route); } - suspendedRoutes.clear(); + startedRoutes.removeAll(stoppeddRoutes); } catch (Exception e) { handleException(e); } @@ -266,29 +233,13 @@ private synchronized void startAllStoppedConsumers() { // Getter/Setters // ************************************************************************* - @ManagedAttribute(description = "The route id") - public String getRouteId() { - if (route != null) { - return route.getId(); - } - return null; - } - - @ManagedAttribute(description = "The consumer endpoint", mask = true) - public String getEndpointUrl() { - if (route != null && route.getConsumer() != null && route.getConsumer().getEndpoint() != null) { - return route.getConsumer().getEndpoint().toString(); - } - return null; - } - - @ManagedAttribute(description = "Whether to stop consumer when starting up and failed to become master") - public boolean isShouldStopConsumer() { - return shouldStopConsumer; + @ManagedAttribute(description = "Whether to stop route when starting up and failed to become master") + public boolean isShouldStopRoute() { + return shouldStopRoute; } - public void setShouldStopConsumer(boolean shouldStopConsumer) { - this.shouldStopConsumer = shouldStopConsumer; + public void setShouldStopRoute(boolean shouldStopRoute) { + this.shouldStopRoute = shouldStopRoute; } @ManagedAttribute(description = "The lock map name") @@ -380,7 +331,7 @@ protected void doStop() throws Exception { @Override public void run() { - if (!isRunAllowed() || !InfinispanRoutePolicy.this.isRunAllowed()) { + if (!isRunAllowed()) { return; } @@ -455,7 +406,7 @@ protected void doStop() throws Exception { @Override public void run() { - if (!isRunAllowed() || !InfinispanRoutePolicy.this.isRunAllowed()) { + if (!isRunAllowed()) { return; } diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java index cad44c1fb92c8..43abf8f1ee314 100644 --- a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java +++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java @@ -16,40 +16,17 @@ */ package org.apache.camel.component.infinispan.policy; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.util.ServiceHelper; import org.infinispan.commons.api.BasicCacheContainer; import org.junit.Assert; import org.junit.Test; -abstract class InfinispanRoutePolicyTestBase extends CamelTestSupport { - protected BasicCacheContainer cacheManager; - protected InfinispanRoutePolicy policy1; - protected InfinispanRoutePolicy policy2; - - @Override - protected void doPreSetup() throws Exception { - this.cacheManager = createCacheManager(); - - this.policy1 = InfinispanRoutePolicy.withManager(cacheManager); - this.policy1.setLockMapName("camel-route-policy"); - this.policy1.setLockKey("route-policy"); - this.policy1.setLockValue("route1"); - - this.policy2 = InfinispanRoutePolicy.withManager(cacheManager); - this.policy2.setLockMapName("camel-route-policy"); - this.policy2.setLockKey("route-policy"); - this.policy2.setLockValue("route2"); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - - if (this.cacheManager != null) { - this.cacheManager.stop(); - } - } +abstract class InfinispanRoutePolicyTestBase { + private final static String CACHE_NAME = "camel-route-policy"; + private final static String CACHE_KEY = "route-policy"; protected abstract BasicCacheContainer createCacheManager() throws Exception; @@ -59,60 +36,50 @@ public void tearDown() throws Exception { @Test public void testLeadership()throws Exception { - context.startRoute("route1"); - while (!policy1.isLeader()) { - Thread.sleep(250); - } + BasicCacheContainer cacheManager = createCacheManager(); - context.startRoute("route2"); - Thread.sleep(500); + InfinispanRoutePolicy policy1 = InfinispanRoutePolicy.withManager(cacheManager); + policy1.setLockMapName(CACHE_NAME); + policy1.setLockKey(CACHE_KEY); + policy1.setLockValue("route1"); - Assert.assertTrue(policy1.isLeader()); - Assert.assertFalse(policy2.isLeader()); + InfinispanRoutePolicy policy2 = InfinispanRoutePolicy.withManager(cacheManager); + policy2.setLockMapName(CACHE_NAME); + policy2.setLockKey(CACHE_KEY); + policy2.setLockValue("route2"); - context.stopRoute("route1"); - while (!policy2.isLeader()) { - Thread.sleep(250); - } + CamelContext context = new DefaultCamelContext(); - Assert.assertFalse(policy1.isLeader()); - Assert.assertTrue(policy2.isLeader()); + try { + context = new DefaultCamelContext(); + context.start(); - context.startRoute("route1"); - Thread.sleep(500); + context.addRouteDefinition(RouteDefinition.fromUri("direct:r1").routePolicy(policy1).to("mock:p1")); - Assert.assertFalse(policy1.isLeader()); - Assert.assertTrue(policy2.isLeader()); + for (int i=0; i < 10 && !policy1.isLeader(); i++) { + Thread.sleep(250); + } - context.stopRoute("route2"); - while (!policy1.isLeader()) { - Thread.sleep(250); - } + context.addRouteDefinition(RouteDefinition.fromUri("direct:r2").routePolicy(policy2).to("mock:p2")); - Assert.assertTrue(policy1.isLeader()); - Assert.assertFalse(policy2.isLeader()); - } + Assert.assertTrue(policy1.isLeader()); + Assert.assertFalse(policy2.isLeader()); - // ******************************************* - // - // ******************************************* + policy1.shutdown(); - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() { - from("direct:route1") - .routeId("route1") - .autoStartup(false) - .routePolicy(policy1) - .to("log:org.apache.camel.component.infinispan.policy.1?level=INFO&showAll=true"); - from("direct:route2") - .routeId("route2") - .autoStartup(false) - .routePolicy(policy2) - .to("log:org.apache.camel.component.infinispan.policy.2?level=INFO&showAll=true"); + for (int i = 0; i < 10 && !policy2.isLeader(); i++) { + Thread.sleep(250); } - }; + + Assert.assertFalse(policy1.isLeader()); + Assert.assertTrue(policy2.isLeader()); + + } finally { + ServiceHelper.stopService(context); + + if (cacheManager != null) { + cacheManager.stop(); + } + } } }