Permalink
Browse files

Merge pull request #1 from dgomezferro/master

Recovery / phase 1
  • Loading branch information...
2 parents 1573afa + d90032d commit b630cf2612e360ddf71d41791738b1c5862a91ee @dgomezferro committed May 7, 2012
Showing with 1,956 additions and 771 deletions.
  1. +0 −1 pom.xml
  2. +9 −48 src/main/java/com/yahoo/pasc/paxos/client/Barrier.java
  3. +19 −1 src/main/java/com/yahoo/pasc/paxos/client/ClientState.java
  4. +5 −77 src/main/java/com/yahoo/pasc/paxos/client/PaxosClient.java
  5. +224 −57 src/main/java/com/yahoo/pasc/paxos/client/PaxosClientHandler.java
  6. +37 −0 src/main/java/com/yahoo/pasc/paxos/client/SimpleClient.java
  7. +8 −1 src/main/java/com/yahoo/pasc/paxos/client/handlers/ReplyHandler.java
  8. +15 −16 src/main/java/com/yahoo/pasc/paxos/handlers/DigestHandler.java
  9. +60 −0 src/main/java/com/yahoo/pasc/paxos/handlers/LeadershipHandler.java
  10. +21 −25 src/main/java/com/yahoo/pasc/paxos/handlers/acceptor/AcceptorAccept.java
  11. +68 −2 src/main/java/com/yahoo/pasc/paxos/handlers/acceptor/AcceptorPrepare.java
  12. +8 −3 src/main/java/com/yahoo/pasc/paxos/handlers/learner/Learner.java
  13. +2 −2 src/main/java/com/yahoo/pasc/paxos/handlers/learner/LearnerPreReply.java
  14. +202 −9 src/main/java/com/yahoo/pasc/paxos/handlers/proposer/ProposerPrepared.java
  15. +55 −69 src/main/java/com/yahoo/pasc/paxos/handlers/proposer/ProposerRequest.java
  16. +5 −1 src/main/java/com/yahoo/pasc/paxos/messages/Accept.java
  17. +0 −16 src/main/java/com/yahoo/pasc/paxos/messages/Accepted.java
  18. +83 −0 src/main/java/com/yahoo/pasc/paxos/messages/DigestToSM.java
  19. +5 −1 src/main/java/com/yahoo/pasc/paxos/messages/Execute.java
  20. +46 −0 src/main/java/com/yahoo/pasc/paxos/messages/LeadershipChange.java
  21. +7 −25 src/main/java/com/yahoo/pasc/paxos/messages/MessageType.java
  22. +20 −15 src/main/java/com/yahoo/pasc/paxos/messages/Prepare.java
  23. +239 −2 src/main/java/com/yahoo/pasc/paxos/messages/Prepared.java
  24. +5 −1 src/main/java/com/yahoo/pasc/paxos/messages/Request.java
  25. +160 −119 src/main/java/com/yahoo/pasc/paxos/messages/serialization/ManualDecoder.java
  26. +56 −14 src/main/java/com/yahoo/pasc/paxos/messages/serialization/ManualEncoder.java
  27. +83 −0 src/main/java/com/yahoo/pasc/paxos/server/LeaderElection.java
  28. +7 −0 src/main/java/com/yahoo/pasc/paxos/server/LeadershipObserver.java
  29. +11 −6 src/main/java/com/yahoo/pasc/paxos/server/PaxosServer.java
  30. +45 −83 src/main/java/com/yahoo/pasc/paxos/server/PipelineFactory.java
  31. +32 −11 src/main/java/com/yahoo/pasc/paxos/server/ServerHandler.java
  32. +57 −22 src/main/java/com/yahoo/pasc/paxos/server/tcp/TcpServer.java
  33. +1 −5 src/main/java/com/yahoo/pasc/paxos/server/udp/UdpServer.java
  34. +70 −30 src/main/java/com/yahoo/pasc/paxos/state/DigestStore.java
  35. +36 −0 src/main/java/com/yahoo/pasc/paxos/state/DigestidDigest.java
  36. +19 −12 src/main/java/com/yahoo/pasc/paxos/state/IidAcceptorsCounts.java
  37. +6 −2 src/main/java/com/yahoo/pasc/paxos/state/InstanceRecord.java
  38. +50 −41 src/main/java/com/yahoo/pasc/paxos/state/PaxosState.java
  39. +85 −0 src/main/java/com/yahoo/pasc/paxos/state/PreparedMessages.java
  40. +35 −9 src/main/java/com/yahoo/pasc/paxos/statemachine/EmptyStateMachine.java
  41. +5 −2 src/main/java/com/yahoo/pasc/paxos/statemachine/StateMachine.java
  42. +0 −25 src/main/java/com/yahoo/pasc/paxos/util/Util.java
  43. +55 −18 src/test/java/com/yahoo/pasc/paxos/PaxosEnsemble.java
View
@@ -27,7 +27,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.11</version>
</plugin>
-
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
@@ -17,12 +17,12 @@
package com.yahoo.pasc.paxos.client;
import java.io.IOException;
-import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
@@ -43,8 +43,8 @@
* @throws IOException
* @throws KeeperException
*/
- Barrier(String address, String root, String name, int size) throws KeeperException, IOException {
- zk = new ZooKeeper(address, 5000, this);
+ Barrier(ZooKeeper zk, String root, String name, int size) throws KeeperException, IOException {
+ this.zk = zk;
this.mutex = new Object();
this.root = root;
this.size = size;
@@ -124,55 +124,16 @@ boolean leave() throws KeeperException, InterruptedException {
}
@Override
- public void process(org.apache.zookeeper.WatchedEvent arg0) {
- synchronized (mutex) {
- mutex.notify();
+ public void process(org.apache.zookeeper.WatchedEvent event) {
+ if (event.getType().equals(EventType.NodeChildrenChanged) ||
+ event.getType().equals(EventType.NodeCreated)) {
+ synchronized (mutex) {
+ mutex.notify();
+ }
}
}
void close() throws InterruptedException {
zk.close();
}
}
-
-class Worker implements Runnable {
- private static Random rand = new Random();
- private String name;
- private int size;
-
- public Worker(String name, int size) {
- this.name = name;
- this.size = size;
- }
-
- @Override
- public void run() {
- try {
- Thread.currentThread().setName("Worker " + name);
- doWork();
- } catch (KeeperException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- private void doWork() throws KeeperException, IOException, InterruptedException {
- Barrier b = new Barrier("localhost:3000", "/test", name, size);
- Thread.sleep(rand.nextInt(10000));
- b.enter();
- System.out.println("Worker " + name + " starting work.");
- Thread.sleep(rand.nextInt(11000));
- b.leave();
- System.out.println("Worker " + name + " finished work.");
- }
-}
-
-enum State {
- ENTERING, CLOSED, LEAVING
-}
@@ -28,9 +28,11 @@
int connected;
int inlineThreshold;
- long timestamp = 0;
+ long timestamp = -1;
Request pendingRequest;
BitSet acks = new BitSet();
+ byte[] value;
+ private int from;
public ClientState(int clientId, int servers, int quorum, int inlineThreshold) {
this.clientId = clientId;
@@ -103,4 +105,20 @@ public void setInlineThreshold(int inlineThreshold) {
this.inlineThreshold = inlineThreshold;
}
+ public byte[] getValue() {
+ return value;
+ }
+
+ public void setValue(byte[] value) {
+ this.value = value;
+ }
+
+ public void setFrom(int serverId) {
+ this.from = serverId;
+ }
+
+ public int getFrom() {
+ return from;
+ }
+
}
@@ -16,7 +16,6 @@
package com.yahoo.pasc.paxos.client;
-import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -27,14 +26,6 @@
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.socket.DatagramChannel;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
@@ -47,8 +38,6 @@
import com.yahoo.pasc.paxos.client.messages.Timeout;
import com.yahoo.pasc.paxos.messages.Hello;
import com.yahoo.pasc.paxos.messages.Reply;
-import com.yahoo.pasc.paxos.messages.serialization.ManualDecoder;
-import com.yahoo.pasc.paxos.messages.serialization.ManualEncoder;
public class PaxosClient {
public static void main(String[] args) throws Exception {
@@ -85,24 +74,21 @@ public static void main(String[] args) throws Exception {
line = parser.parse(options, args);
String host = line.hasOption('l') ? line.getOptionValue('l') : "localhost:20548,localhost:20748,localhost:20778";
+ String zkConnection = line.hasOption('z') ? line.getOptionValue('z') : "localhost:2181";
int clientIds = line.hasOption('i') ? Integer.parseInt(line.getOptionValue('i')) : 0;
int servers = line.hasOption('s') ? Integer.parseInt(line.getOptionValue('s')) : 3;
int quorum = line.hasOption('q') ? Integer.parseInt(line.getOptionValue('q')) : 1;
int buffer = line.hasOption('b') ? Integer.parseInt(line.getOptionValue('b')) : 1;
- int clientPort = line.hasOption('p') ? Integer.parseInt(line.getOptionValue('p')) : 9000;
int timeout = line.hasOption('t') ? Integer.parseInt(line.getOptionValue('t')) : 5000;
int clients = line.hasOption('c') ? Integer.parseInt(line.getOptionValue('c')) : 1;
int requestSize = line.hasOption('r') ? Integer.parseInt(line.getOptionValue('r')) : 0;
int inlineThreshold = line.hasOption('n') ? Integer.parseInt(line.getOptionValue('n')) : 10;
boolean protection = line.hasOption('a');
- boolean udp = line.hasOption('u');
- String zkConnection = line.getOptionValue('z');
int threads = Runtime.getRuntime().availableProcessors() * 2;
final ExecutionHandler executor = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(threads, 1024 * 1024,
1024 * 1024 * 1024, 30, TimeUnit.SECONDS, Executors.defaultThreadFactory()));
- final ManualEncoder encoder = new ManualEncoder();
String [] serverHosts = host.split(",");
@@ -124,27 +110,14 @@ public static void main(String[] args) throws Exception {
runtime.addHandler(Timeout.class, tout);
final PaxosClientHandler handler = new PaxosClientHandler(runtime, new SimpleClient(requestSize),
- clientId, clients, timeout, zkConnection);
+ serverHosts, clientId, clients, timeout, zkConnection, executor);
if (line.hasOption('w')) handler.setWarmup(Integer.parseInt(line.getOptionValue('w')));
if (line.hasOption('m')) handler.setMeasuringTime(Integer.parseInt(line.getOptionValue('m')));
-// if (line.hasOption('r')) handler.setRequestSize(Integer.parseInt(line.getOptionValue('r')));
if (line.hasOption('f')) handler.setPeriod(Integer.parseInt(line.getOptionValue('f')));
-
- ChannelPipelineFactory channelPipelineFactory = new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() throws Exception {
- // return Channels.pipeline(new KryoEncoder(kryo), new KryoDecoder(kryo), executor, handler);
- return Channels.pipeline(encoder, new ManualDecoder(), executor, handler);
-// return Channels.pipeline(new ObjectEncoder(), new ObjectDecoder(), executor, handler);
- }
- };
-
- if (udp) {
- startUdpClient(channelPipelineFactory, clientPort, serverHosts);
- } else {
- startTcpClient(channelPipelineFactory, clientPort, serverHosts);
- }
-
+
+ handler.start();
+
Thread.sleep(rnd.nextInt(200));
}
} catch (Exception e) {
@@ -157,49 +130,4 @@ public ChannelPipeline getPipeline() throws Exception {
System.exit(-1);
}
}
-
- private static void startUdpClient(ChannelPipelineFactory channelPipelineFactory, int clientPort, String[] hosts) {
-
- // Configure the client.
- ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
- Executors.newCachedThreadPool()));
-
- // Set up the pipeline factory.
- bootstrap.setPipelineFactory(channelPipelineFactory);
-
- // Start the connection attempt.
- bootstrap.bind(new InetSocketAddress(clientPort));
-
- for (String host : hosts) {
- final String s[] = host.split(":");
- final String hostname = s[0];
- final int port = Integer.parseInt(s[1]);
- System.out.println("Connecting to " + hostname + ":" + port);
- DatagramChannel clientChannel = (DatagramChannel) bootstrap.bind(new InetSocketAddress(0));
- clientChannel.connect(new InetSocketAddress(hostname, port)).awaitUninterruptibly();
- }
- }
-
- private static void startTcpClient(ChannelPipelineFactory channelPipelineFactory, int clientPort, String [] hosts) {
-
-// startServer(channelPipelineFactory, clientPort);
-
- // Configure the client.
- ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
-
- // Set up the pipeline factory.
- bootstrap.setPipelineFactory(channelPipelineFactory);
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
-
- for (String host : hosts) {
- final String s[] = host.split(":");
- final String hostname = s[0];
- final int port = Integer.parseInt(s[1]);
- System.out.println("Connecting to " + hostname + ":" + port);
- // Start the connection attempt.
- bootstrap.connect(new InetSocketAddress(hostname, port));
- }
- }
}
Oops, something went wrong.

0 comments on commit b630cf2

Please sign in to comment.