Skip to content

Commit

Permalink
CAMEL-11362: create a LeaderElectionservice
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Jun 16, 2017
1 parent 04191e5 commit 54052a3
Show file tree
Hide file tree
Showing 8 changed files with 564 additions and 202 deletions.
@@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.ha;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.StampedLock;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

public abstract class AbstractCamelClusterView implements CamelClusterView {
private final CamelCluster cluster;
private final String namespace;
private final List<FilteringConsumer> consumers;
private final StampedLock lock;

protected AbstractCamelClusterView(CamelCluster cluster, String namespace) {
this.cluster = cluster;
this.namespace = namespace;
this.consumers = new ArrayList<>();
this.lock = new StampedLock();
}

@Override
public CamelCluster getCluster() {
return this.cluster;
}

@Override
public String getNamespace() {
return this.namespace;
}

@Override
public void addEventListener(BiConsumer<Event, Object> consumer) {
long stamp = lock.writeLock();

try {
consumers.add(new FilteringConsumer(e -> true, consumer));
} finally {
lock.unlockWrite(stamp);
}
}

@Override
public void addEventListener(Predicate<Event> predicate, BiConsumer<Event, Object> consumer) {
long stamp = lock.writeLock();

try {
this.consumers.add(new FilteringConsumer(predicate, consumer));
} finally {
lock.unlockWrite(stamp);
}
}

@Override
public void removeEventListener(BiConsumer<Event, Object> consumer) {
long stamp = lock.writeLock();

try {
consumers.removeIf(c -> c.getConsumer().equals(consumer));
} finally {
lock.unlockWrite(stamp);
}
}

// **************************************
// Events
// **************************************

protected void fireEvent(CamelClusterView.Event event, Object payload) {
long stamp = lock.readLock();

try {
for (int i = 0; i < consumers.size(); i++) {
consumers.get(0).accept(event, payload);
}
} finally {
lock.unlockRead(stamp);
}
}

// **************************************
// Helpers
// **************************************

private final class FilteringConsumer implements BiConsumer<Event, Object> {
private final Predicate<Event> predicate;
private final BiConsumer<Event, Object> consumer;

FilteringConsumer(Predicate<Event> predicate, BiConsumer<Event, Object> consumer) {
this.predicate = predicate;
this.consumer = consumer;
}

@Override
public void accept(CamelClusterView.Event event, Object payload) {
if (predicate.test(event)) {
consumer.accept(event, payload);
}
}

public BiConsumer<Event, Object> getConsumer() {
return this.consumer;
}
}
}
30 changes: 30 additions & 0 deletions camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.ha;

import org.apache.camel.spi.HasId;

public interface CamelCluster extends HasId {
/**
* Creates a view of the cluster bound to a namespace.
*
* @param namespace the namespace the view refer to.
* @return the cluster view.
* @throws Exception if the view can't be created.
*/
CamelClusterView createView(String namespace) throws Exception;
}
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.ha;

import org.apache.camel.spi.HasId;

public interface CamelClusterMember extends HasId {
/**
* @return true if this member is the master.
*/
boolean isMaster();
}
85 changes: 85 additions & 0 deletions camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.ha;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

/**
* Represents the View of the cluster at some given period of time.
*/
public interface CamelClusterView {

enum Event {
KEEP_ALIVE,
LEADERSHIP_CHANGED;
}

/**
* @return the cluster.
*/
CamelCluster getCluster();

/**
* @return the namespace for this view.
*/
String getNamespace();

/**
* Provides the master member.
*
* @return the master member.
*/
CamelClusterMember getMaster();

/**
* Provides the local member.
*
* @return the local member.
*/
CamelClusterMember getLocalMember();

/**
* Provides the list of members of the cluster.
*
* @return the list of members.
*/
List<CamelClusterMember> getMembers();

/**
* Add an event consumer.
*
* @param consumer the event consumer.
*/
void addEventListener(BiConsumer<Event, Object> consumer);

/**
* Add an event consumer for events matching the given predicate.
*
* @param predicate the predicate to filter events.
* @param consumer the event consumer.
*/
void addEventListener(Predicate<Event> predicate, BiConsumer<Event, Object> consumer);

/**
* Remove the event consumer.
*
* @param event the event consumer.
*/
void removeEventListener(BiConsumer<Event, Object> event);
}

0 comments on commit 54052a3

Please sign in to comment.