Permalink
Browse files

ReplyStore

  • Loading branch information...
1 parent e961fcf commit 7cf7ff02f8eb9951b79db7bb83b80987e142701f @dgomezferro committed Sep 6, 2012
View
29 pom.xml
@@ -68,28 +68,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.3.3</version>
- <type>jar</type>
- <scope>compile</scope>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.jms</groupId>
- <artifactId>jms</artifactId>
- </exclusion>
- </exclusions>
-
- </dependency>
- <dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.5.Final</version>
@@ -130,5 +108,12 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.3</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
@@ -32,15 +32,17 @@
long timestamp = -1;
Request pendingRequest;
BitSet acks = new BitSet();
- byte[] value;
+// byte[] value;
private int from;
+ ReplyStore replyStore;
public ClientState(int clientId, int servers, int quorum, int inlineThreshold, int asyncMessages) {
this.clientId = clientId;
this.servers = servers;
this.quorum = quorum;
this.inlineThreshold = inlineThreshold;
this.asyncMessages = new TimestampMessage[asyncMessages];
+ this.replyStore = new ReplyStore(servers);
}
public int getClientId() {
@@ -106,17 +108,25 @@ public int getInlineThreshold() {
public void setInlineThreshold(int inlineThreshold) {
this.inlineThreshold = inlineThreshold;
}
+//
+// public byte[] getValue() {
+// return value;
+// }
+//
+// public void setValue(byte[] value) {
+// this.value = value;
+// }
- public byte[] getValue() {
- return value;
+ public void setFrom(int serverId) {
+ this.from = serverId;
}
- public void setValue(byte[] value) {
- this.value = value;
+ public ReplyStore getReplyStore() {
+ return replyStore;
}
- public void setFrom(int serverId) {
- this.from = serverId;
+ public void setReplyStore(ReplyStore replyStore) {
+ this.replyStore = replyStore;
}
public int getFrom() {
@@ -17,7 +17,6 @@
package com.yahoo.pasc.paxos.client.handlers;
import java.util.Arrays;
-import java.util.BitSet;
import java.util.List;
import org.slf4j.Logger;
@@ -26,6 +25,7 @@
import com.yahoo.pasc.Message;
import com.yahoo.pasc.MessageHandler;
import com.yahoo.pasc.paxos.client.ClientState;
+import com.yahoo.pasc.paxos.client.ReplyStore;
import com.yahoo.pasc.paxos.client.messages.Received;
import com.yahoo.pasc.paxos.messages.Reply;
@@ -43,22 +43,17 @@ public boolean guardPredicate(Reply receivedMessage) {
public List<Received.Descriptor> processMessage(Reply reply, ClientState state) {
List<Received.Descriptor> descriptors = null;
if (matches(reply, state)) {
-// LOG.debug("Reply {} matches with state", reply, state.getPendingRequest());
- BitSet acks = state.getAcks();
- if (acks.cardinality() == 0) {
- state.setValue(reply.getValue());
- state.setFrom(reply.getServerId());
-// } else if (!Arrays.equals(state.getValue(), reply.getValue())) {
-// LOG.warn("State divergence. Conflicting response. \n Stored: {} \n From: {} \n Existing quorum: {} \n Received: {} \n From: {}",
-// new Object[] { state.getValue(), state.getFrom(), acks.cardinality(), reply.getValue(), reply.getServerId() });
+ ReplyStore store = state.getReplyStore();
+ int quorum = state.getQuorum();
+ if (store.getMatch(quorum) != -1) {
+ // Already delivered message
+ return null;
}
- acks.set(reply.getServerId());
- if (acks.cardinality() >= state.getQuorum()) {
-// LOG.debug("Reached quorum {}",state.getQuorum());
- acks.clear();
- descriptors = Arrays.asList(new Received.Descriptor(reply.getValue()));
- } else {
-// LOG.debug("Still no quorum {} (we are at {})", state.getQuorum(), acks.cardinality());
+ store.addRemote(reply.getServerId(), reply.getValue());
+ byte[] value = store.getStableReply(quorum);
+ if (value != null) {
+ descriptors = Arrays.asList(new Received.Descriptor(value));
+ state.setReplyStore(new ReplyStore(state.getServers()));
}
}
return descriptors;
@@ -59,11 +59,7 @@
long repTs = state.getReplyCacheTimestampElement(clientId);
if (repTs >= timestamp) {
LOG.trace("We hit the reply cache for client {} with timestamp {}.", clientId, timestamp);
- if (state.getIsLeader()) {
- return null;
- } else {
- return Arrays.<PaxosDescriptor> asList(new Reply.Descriptor(clientId));
- }
+ return Arrays.<PaxosDescriptor> asList(new Reply.Descriptor(clientId));
}
IidRequest request;

0 comments on commit 7cf7ff0

Please sign in to comment.