Skip to content

Commit

Permalink
Refine Cluster Service API and implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Sep 25, 2017
1 parent 24cb543 commit 8ef99f5
Show file tree
Hide file tree
Showing 25 changed files with 188 additions and 107 deletions.
Expand Up @@ -16,6 +16,8 @@
*/ */
package org.apache.camel.ha; package org.apache.camel.ha;


import java.util.Optional;

/** /**
* Marker interface for cluster events * Marker interface for cluster events
*/ */
Expand All @@ -27,9 +29,9 @@ interface Leadership extends CamelClusterEventListener {
* Notify a change in the leadership for a particular cluster. * Notify a change in the leadership for a particular cluster.
* *
* @param view the cluster view * @param view the cluster view
* @param leader the new leader or null (when there are no active leaders) * @param leader the optional new leader
*/ */
void leadershipChanged(CamelClusterView view, CamelClusterMember leader); void leadershipChanged(CamelClusterView view, Optional<CamelClusterMember> leader);


} }


Expand Down
Expand Up @@ -23,4 +23,9 @@ public interface CamelClusterMember extends HasId {
* @return true if this member is the master. * @return true if this member is the master.
*/ */
boolean isLeader(); boolean isLeader();

/**
* @return true if this member is local.
*/
boolean isLocal();
} }
Expand Up @@ -18,6 +18,7 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer; import java.util.function.Consumer;


Expand Down Expand Up @@ -92,7 +93,7 @@ private <T extends CamelClusterEventListener> void doWithListener(Class<T> type,
); );
} }


protected void fireLeadershipChangedEvent(CamelClusterMember leader) { protected void fireLeadershipChangedEvent(Optional<CamelClusterMember> leader) {
doWithListener( doWithListener(
CamelClusterEventListener.Leadership.class, CamelClusterEventListener.Leadership.class,
listener -> listener.leadershipChanged(this, leader) listener -> listener.leadershipChanged(this, leader)
Expand Down
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration; import java.time.Duration;
import java.util.EventObject; import java.util.EventObject;
import java.util.HashSet; import java.util.HashSet;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -262,7 +263,7 @@ private void onCamelContextStarted() {


private class CamelClusterLeadershipListener implements CamelClusterEventListener.Leadership { private class CamelClusterLeadershipListener implements CamelClusterEventListener.Leadership {
@Override @Override
public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) { public void leadershipChanged(CamelClusterView view, Optional<CamelClusterMember> leader) {
setLeader(clusterView.getLocalMember().isLeader()); setLeader(clusterView.getLocalMember().isLeader());
} }
} }
Expand Down
Expand Up @@ -147,8 +147,8 @@ protected void doStop() throws Exception {
super.doStop(); super.doStop();


if (atomix != null) { if (atomix != null) {
LOGGER.debug("Shutdown atomix replica {}", atomix); LOGGER.debug("Leaving atomix cluster replica {}", atomix);
atomix.shutdown().join(); atomix.leave().join();
} }
} }


Expand Down
Expand Up @@ -95,21 +95,21 @@ protected void doStart() throws Exception {
LOGGER.debug("Listen election events"); LOGGER.debug("Listen election events");
group.election().onElection(term -> { group.election().onElection(term -> {
if (isRunAllowed()) { if (isRunAllowed()) {
fireLeadershipChangedEvent(new AtomixClusterMember(term.leader())); fireLeadershipChangedEvent(Optional.of(toClusterMember(term.leader())));
} }
}); });


LOGGER.debug("Listen join events"); LOGGER.debug("Listen join events");
group.onJoin(member -> { group.onJoin(member -> {
if (isRunAllowed()) { if (isRunAllowed()) {
fireMemberAddedEvent(new AtomixClusterMember(member)); fireMemberAddedEvent(toClusterMember(member));
} }
}); });


LOGGER.debug("Listen leave events"); LOGGER.debug("Listen leave events");
group.onLeave(member -> { group.onLeave(member -> {
if (isRunAllowed()) { if (isRunAllowed()) {
fireMemberRemovedEvent(new AtomixClusterMember(member)); fireMemberRemovedEvent(toClusterMember(member));
} }
}); });


Expand All @@ -123,11 +123,17 @@ protected void doStop() throws Exception {
localMember.leave(); localMember.leave();
} }


protected CamelClusterMember toClusterMember(GroupMember member) {
return localMember != null && localMember.is(member)
? localMember
: new AtomixClusterMember(member);
}

// *********************************************** // ***********************************************
// //
// *********************************************** // ***********************************************


class AtomixLocalMember implements CamelClusterMember { final class AtomixLocalMember implements CamelClusterMember {
private LocalMember member; private LocalMember member;


@Override @Override
Expand All @@ -153,6 +159,17 @@ public boolean isLeader() {
return member.equals(group.election().term().leader()); return member.equals(group.election().term().leader());
} }


@Override
public boolean isLocal() {
return true;
}

boolean is(GroupMember member) {
return this.member != null
? this.member.equals(member)
: false;
}

boolean hasJoined() { boolean hasJoined() {
return member != null; return member != null;
} }
Expand Down Expand Up @@ -183,8 +200,7 @@ AtomixLocalMember leave() {
group.remove(id).join(); group.remove(id).join();


member = null; member = null;

fireLeadershipChangedEvent(Optional.empty());
fireLeadershipChangedEvent(null);
} }


return this; return this;
Expand All @@ -199,7 +215,7 @@ public String toString() {
} }
} }


class AtomixClusterMember implements CamelClusterMember { final class AtomixClusterMember implements CamelClusterMember {
private final GroupMember member; private final GroupMember member;


AtomixClusterMember(GroupMember member) { AtomixClusterMember(GroupMember member) {
Expand All @@ -223,6 +239,11 @@ public boolean isLeader() {
return member.equals(group.election().term().leader()); return member.equals(group.election().term().leader());
} }


@Override
public boolean isLocal() {
return localMember != null ? localMember.is(member) : false;
}

@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder("AtomixClusterMember{"); final StringBuilder sb = new StringBuilder("AtomixClusterMember{");
Expand Down
Expand Up @@ -81,7 +81,8 @@ public void test() throws Exception {


private void run(String id) { private void run(String id) {
try { try {
CountDownLatch contextLatch = new CountDownLatch(1); int events = ThreadLocalRandom.current().nextInt(2, 6);
CountDownLatch contextLatch = new CountDownLatch(events);


DefaultCamelContext context = new DefaultCamelContext(); DefaultCamelContext context = new DefaultCamelContext();
context.disableJMX(); context.disableJMX();
Expand All @@ -91,15 +92,10 @@ private void run(String id) {
context.addRoutes(new RouteBuilder() { context.addRoutes(new RouteBuilder() {
@Override @Override
public void configure() throws Exception { public void configure() throws Exception {
from("timer:atomix?delay=1s&period=1s&repeatCount=1") from("timer:atomix?delay=1s&period=1s")
.routeId("route-" + id) .routeId("route-" + id)
.process(e -> { .log("From ${routeId}")
LOGGER.debug("Node {} done", id); .process(e -> contextLatch.countDown());
results.add(id);
// Shutdown the context later on to give a chance to
// other members to catch-up
scheduler.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
});
} }
}); });


Expand All @@ -109,6 +105,10 @@ public void configure() throws Exception {
context.start(); context.start();


contextLatch.await(); contextLatch.await();

LOGGER.debug("Shutting down client node {}", id);
results.add(id);

context.stop(); context.stop();


latch.countDown(); latch.countDown();
Expand Down
Expand Up @@ -47,7 +47,7 @@ public final class AtomixRoutePolicyTest {
); );


private final Set<Address> results = new HashSet<>(); private final Set<Address> results = new HashSet<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(addresses.size() * 2); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(addresses.size());
private final CountDownLatch latch = new CountDownLatch(addresses.size()); private final CountDownLatch latch = new CountDownLatch(addresses.size());


// ************************************ // ************************************
Expand All @@ -73,7 +73,8 @@ public void test() throws Exception {


private void run(Address address) { private void run(Address address) {
try { try {
CountDownLatch contextLatch = new CountDownLatch(1); int events = ThreadLocalRandom.current().nextInt(2, 6);
CountDownLatch contextLatch = new CountDownLatch(events);


AtomixClusterService service = new AtomixClusterService(); AtomixClusterService service = new AtomixClusterService();
service.setId("node-" + address.port()); service.setId("node-" + address.port());
Expand All @@ -89,15 +90,10 @@ private void run(Address address) {
context.addRoutes(new RouteBuilder() { context.addRoutes(new RouteBuilder() {
@Override @Override
public void configure() throws Exception { public void configure() throws Exception {
from("timer:atomix?delay=1s&period=1s&repeatCount=1") from("timer:atomix?delay=1s&period=1s")
.routeId("route-" + address.port()) .routeId("route-" + address.port())
.process(e -> { .log("From ${routeId}")
LOGGER.debug("Node {} done", address); .process(e -> contextLatch.countDown());
results.add(address);
// Shutdown the context later on to give a chance to
// other members to catch-up
scheduler.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
});
} }
}); });


Expand All @@ -107,6 +103,10 @@ public void configure() throws Exception {
context.start(); context.start();


contextLatch.await(); contextLatch.await();

LOGGER.debug("Shutting down node {}", address);
results.add(address);

context.stop(); context.stop();


latch.countDown(); latch.countDown();
Expand Down
4 changes: 4 additions & 0 deletions components/camel-atomix/src/test/resources/log4j2.properties
Expand Up @@ -27,6 +27,10 @@ appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n


logger.atomix.name = io.atomix logger.atomix.name = io.atomix
logger.atomix.level = INFO logger.atomix.level = INFO
logger.atomix-copycat.name = io.atomix.copycat
logger.atomix-copycat.level = WARN
logger.atomix-catalyst.name = io.atomix.catalyst
logger.atomix-catalyst.level = WARN


logger.camel.name = org.apache.camel logger.camel.name = org.apache.camel
logger.camel.level = INFO logger.camel.level = INFO
Expand Down
2 changes: 1 addition & 1 deletion components/camel-consul/pom.xml
Expand Up @@ -163,7 +163,7 @@
<images> <images>
<image> <image>
<name>consul:latest</name> <name>consul:latest</name>
<alias>consul</alias> <alias>0.9.3</alias>
<run> <run>
<ports> <ports>
<port>consul.port:8500</port> <port>consul.port:8500</port>
Expand Down
Expand Up @@ -35,6 +35,7 @@
import com.orbitz.consul.option.QueryOptions; import com.orbitz.consul.option.QueryOptions;
import org.apache.camel.ha.CamelClusterMember; import org.apache.camel.ha.CamelClusterMember;
import org.apache.camel.impl.ha.AbstractCamelClusterView; import org.apache.camel.impl.ha.AbstractCamelClusterView;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -43,7 +44,6 @@ final class ConsulClusterView extends AbstractCamelClusterView {


private final ConsulClusterConfiguration configuration; private final ConsulClusterConfiguration configuration;
private final ConsulLocalMember localMember; private final ConsulLocalMember localMember;
private final ConsulClusterMember nullMember;
private final AtomicReference<String> sessionId; private final AtomicReference<String> sessionId;
private final Watcher watcher; private final Watcher watcher;


Expand All @@ -57,7 +57,6 @@ final class ConsulClusterView extends AbstractCamelClusterView {


this.configuration = configuration; this.configuration = configuration;
this.localMember = new ConsulLocalMember(); this.localMember = new ConsulLocalMember();
this.nullMember = new ConsulClusterMember();
this.sessionId = new AtomicReference<>(); this.sessionId = new AtomicReference<>();
this.watcher = new Watcher(); this.watcher = new Watcher();
this.path = configuration.getRootPath() + "/" + namespace; this.path = configuration.getRootPath() + "/" + namespace;
Expand Down Expand Up @@ -126,17 +125,21 @@ protected void doStop() throws Exception {
LOGGER.debug("Successfully released lock on path '{}' with id '{}'", path, sessionId.get()); LOGGER.debug("Successfully released lock on path '{}' with id '{}'", path, sessionId.get());
} }


sessionClient.destroySession(sessionId.getAndSet(null)); synchronized (sessionId) {
localMember.setMaster(false); sessionClient.destroySession(sessionId.getAndSet(null));
localMember.setMaster(false);
}
} }
} }


private boolean acquireLock() { private boolean acquireLock() {
String sid = sessionId.get(); synchronized (sessionId) {
String sid = sessionId.get();


return (sid != null) return (sid != null)
? keyValueClient.acquireLock(this.path, sid) ? sessionClient.getSessionInfo(sid).transform(si -> keyValueClient.acquireLock(path, sid)).or(Boolean.FALSE)
: false; : false;
}
} }


// *********************************************** // ***********************************************
Expand All @@ -149,12 +152,12 @@ private final class ConsulLocalMember implements CamelClusterMember {
void setMaster(boolean master) { void setMaster(boolean master) {
if (master && this.master.compareAndSet(false, true)) { if (master && this.master.compareAndSet(false, true)) {
LOGGER.debug("Leadership taken for session id {}", sessionId.get()); LOGGER.debug("Leadership taken for session id {}", sessionId.get());
fireLeadershipChangedEvent(this); fireLeadershipChangedEvent(Optional.of(this));
return; return;
} }
if (!master && this.master.compareAndSet(true, false)) { if (!master && this.master.compareAndSet(true, false)) {
LOGGER.debug("Leadership lost for session id {}", sessionId.get()); LOGGER.debug("Leadership lost for session id {}", sessionId.get());
fireLeadershipChangedEvent(getMaster().orElse(nullMember)); fireLeadershipChangedEvent(getMaster());
return; return;
} }
} }
Expand All @@ -164,6 +167,11 @@ public boolean isLeader() {
return master.get(); return master.get();
} }


@Override
public boolean isLocal() {
return true;
}

@Override @Override
public String getId() { public String getId() {
return sessionId.get(); return sessionId.get();
Expand Down Expand Up @@ -209,6 +217,15 @@ public boolean isLeader() {
return id.equals(keyValueClient.getSession(path)); return id.equals(keyValueClient.getSession(path));
} }


@Override
public boolean isLocal() {
if (id == null) {
return false;
}

return ObjectHelper.equal(id, localMember.getId());
}

@Override @Override
public String toString() { public String toString() {
return "ConsulClusterMember{" return "ConsulClusterMember{"
Expand Down

0 comments on commit 8ef99f5

Please sign in to comment.