Skip to content
Browse files

Merge pull request #74 from pruivo/t_toa_discard_leavers

TOA: discard leavers' messages
  • Loading branch information...
2 parents 9921795 + a25bd1b commit 7cb117b231b11c11e3883089d0c51f4001089647 @belaban committed
View
22 src/org/jgroups/protocols/tom/DeliveryManagerImpl.java
@@ -1,5 +1,6 @@
package org.jgroups.protocols.tom;
+import org.jgroups.Address;
import org.jgroups.Message;
import java.util.*;
@@ -101,6 +102,27 @@ private void markReadyToDeliverV2(MessageID messageID, long finalSequenceNumber)
}
}
+ public final void removeLeavers(Collection<Address> leavers) {
+ if (leavers == null) {
+ return;
+ }
+ List<MessageInfo> toRemove = new LinkedList<MessageInfo>();
+ synchronized (deliverySet) {
+ for (MessageInfo messageInfo : deliverySet) {
+ if (leavers.contains(messageInfo.getMessage().getSrc()) && !messageInfo.isReadyToDeliver()) {
+ toRemove.add(messageInfo);
+ }
+ }
+ deliverySet.removeAll(toRemove);
+ if (deliverySet.first().isReadyToDeliver()) {
+ deliverySet.notify();
+ }
+ }
+ for (MessageInfo removed : toRemove) {
+ messageCache.remove(removed.messageID);
+ }
+ }
+
//see the interface javadoc
@Override
public List<Message> getNextMessagesToDeliver() throws InterruptedException {
View
23 src/org/jgroups/protocols/tom/SenderManager.java
@@ -82,6 +82,18 @@ public void clear() {
sentMessages.clear();
}
+ public Collection<MessageID> getPendingMessageIDs() {
+ return sentMessages.keySet();
+ }
+
+ public long removeLeavers(MessageID messageID, Collection<Address> leavers) {
+ MessageInfo messageInfo = sentMessages.get(messageID);
+ if (messageInfo != null && messageInfo.removeLeavers(leavers)) {
+ return messageInfo.getAndMarkFinalSent();
+ }
+ return NOT_READY;
+ }
+
/**
* The state of a message (destination, proposes missing, the highest sequence number proposed, etc...)
*/
@@ -131,6 +143,17 @@ private void setProposeReceived(Address address) {
private boolean checkAllProposesReceived() {
return receivedPropose.isEmpty();
}
+
+ public synchronized boolean removeLeavers(Collection<Address> leavers) {
+ for (Address address : leavers) {
+ int idx = destinations.indexOf(address);
+ if (idx == -1) {
+ continue;
+ }
+ receivedPropose.set(idx, false);
+ }
+ return checkAllProposesReceived();
+ }
}
}
View
49 src/org/jgroups/protocols/tom/TOA.java
@@ -6,12 +6,14 @@
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.stack.Protocol;
+import org.jgroups.util.Util;
import java.util.Collection;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-/**
+/**
* Total Order Anycast with three communication steps (based on Skeen's Algorithm). Establishes total order for a
* message sent to a subset of the cluster members (an anycast). Example: send a totally ordered message to {D,E}
* out of a membership of {A,B,C,D,E,F}.<p/>
@@ -40,6 +42,7 @@
//stats: profiling information
private final StatsCollector statsCollector = new StatsCollector();
+ private volatile View currentView;
public TOA() {
}
@@ -135,6 +138,46 @@ private void handleViewChange(View view) {
if (log.isTraceEnabled()) {
log.trace("Handle view " + view);
}
+ View oldView = currentView;
+ currentView = view;
+
+ //basis behavior: drop leavers message (as senders)
+ List<Address> leavers = Util.leftMembers(oldView, view);
+ deliverManager.removeLeavers(leavers);
+
+ //basis behavior: avoid waiting for the acks
+ Collection<MessageID> pendingSentMessages = senderManager.getPendingMessageIDs();
+ for (MessageID messageID : pendingSentMessages) {
+ long finalSequenceNumber = senderManager.removeLeavers(messageID, leavers);
+ if (finalSequenceNumber != SenderManager.NOT_READY) {
+ Message finalMessage = new Message();
+ finalMessage.setSrc(localAddress);
+
+ ToaHeader finalHeader = ToaHeader.createNewHeader(
+ ToaHeader.FINAL_MESSAGE,messageID);
+
+ finalHeader.setSequencerNumber(finalSequenceNumber);
+ finalMessage.putHeader(this.id, finalHeader);
+ finalMessage.setFlag(Message.Flag.OOB);
+ finalMessage.setFlag(Message.Flag.DONT_BUNDLE);
+
+ Set<Address> destinations = senderManager.getDestination(messageID);
+ if (destinations.contains(localAddress)) {
+ destinations.remove(localAddress);
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Message " + messageID + " is ready to be deliver. Final sequencer number is " +
+ finalSequenceNumber);
+ }
+
+ send(destinations,finalMessage, false);
+ //returns true if we are in destination set
+ if (senderManager.markSent(messageID)) {
+ deliverManager.markReadyToDeliver(messageID, finalSequenceNumber);
+ }
+ }
+ }
// TODO: Future work: How to add fault tolerance? (simple and efficient)
}
@@ -174,8 +217,8 @@ private void sendTotalOrderAnycastMessage(Collection<Address> destinations, Mess
}
if (destinations.size() == 1) {
- if (warn) {
- log.warn("sending an AnycastAddress with 1 element");
+ if (log.isDebugEnabled()) {
+ log.debug("sending an AnycastAddress with 1 element");
}
message.putHeader(id, ToaHeader.createSingleDestinationHeader());
message.setDest(destinations.iterator().next());

0 comments on commit 7cb117b

Please sign in to comment.
Something went wrong with that request. Please try again.